Просмотр исходного кода

执行器任务结果持久化:执行器回调失败时将任务结果写磁盘,待重启或网络恢复时重试回调任务结果,防止任务执行结果丢失;

xuxueli 7 лет назад
Родитель
Сommit
7dc7c1f2d0

+ 20 - 21
doc/XXL-JOB官方文档.md Просмотреть файл

@@ -1230,33 +1230,32 @@ Tips: 历史版本(V1.3.x)目前已经Release至稳定版本, 进入维护阶段
1230 1230
 - 14、脚本任务Log文件流关闭优化;
1231 1231
 - 15、任务报表成功、失败和进行中统计问题修复;
1232 1232
 - 16、自研Log组件参数占位符改为"{}",并修复打印有参日志时参数不匹配导致报错的问题;
1233
+- 17、执行器任务结果持久化:执行器回调失败时将任务结果写磁盘,待重启或网络恢复时重试回调任务结果,防止任务执行结果丢失;
1233 1234
 
1234 1235
 
1235 1236
 ### TODO LIST
1236 1237
 - 1、任务权限管理:执行器为粒度分配权限,核心操作校验权限;
1237 1238
 - 2、任务分片路由:分片采用一致性Hash算法计算出尽量稳定的分片顺序,即使注册机器存在波动也不会引起分批分片顺序大的波动;目前采用IP自然排序,可以满足需求,待定;
1238 1239
 - 3、任务单机多线程:提升任务单机并行处理能力;
1239
-- 4、回调失败丢包问题:执行器回调失败写文件,重启或周期性回调重试;调度中心周期性请求并同步未回调的执行结果;
1240
-- 5、任务依赖,流程图,子任务+会签任务,各节点日志;
1241
-- 6、调度任务优先级;
1242
-- 7、移除quartz依赖,重写调度模块:新增或恢复任务时将下次执行记录插入delayqueue,调度中心集群竞争分布式锁,成功节点批量加载到期delayqueue数据,批量执行。
1243
-- 8、springboot 和 docker镜像,并且推送docker镜像到中央仓库,更进一步实现产品开箱即用;
1244
-- 9、多数据库支持;
1245
-- 10、执行器Log清理功能:调度中心Log删除时同步删除执行器中的Log文件;
1246
-- 11、Bean模式任务,JobHandler自动从执行器中查询展示为下拉框,选择后自动填充任务名称等属性;
1247
-- 12、API事件触发类型任务(更类似MQ消息)支持"动态传参、延时消费";该类型任务不走Quartz,单独建立MQ消息表,调度中心竞争触发;
1248
-- 13、任务依赖增强,新增任务类型 "流程任务",流程节点可挂载普通类型任务,承担任务依赖功能。现有子任务模型取消;需要考虑任务依赖死循环问题;
1249
-- 14、分片任务某一分片失败,支持分片转移;
1250
-- 15、调度中心触发任务后,先推送触发队列,异步触发,然后立即返回。降低quartz线程占用时长。
1251
-- 16、任务告警逻辑调整:任务调度,以及任务回调失败时,均推送监控队列。后期考虑通过任务Log字段控制告警状态;
1252
-- 17、新增任务默认运行状态,任务更新时运行状态保持不变;
1253
-- 18、提供多版本执行器:不依赖容器版本、不内嵌Jetty版本(通过配置executoraddress替换jetty通讯)等;
1254
-- 19、注册中心支持扩展,除默认基于DB之外,支持扩展接入第三方注册中心如zk、eureka等;
1255
-- 20、依赖Core内部国际化处理;
1256
-- 21、流程任务,支持参数传递;
1257
-- 22、SimpleTrigger 支持;
1258
-- 23、springboot热部署支持;
1259
-- 24、支持通过API服务操作任务信息;
1240
+- 4、任务依赖,流程图,子任务+会签任务,各节点日志;
1241
+- 5、调度任务优先级;
1242
+- 6、移除quartz依赖,重写调度模块:新增或恢复任务时将下次执行记录插入delayqueue,调度中心集群竞争分布式锁,成功节点批量加载到期delayqueue数据,批量执行。
1243
+- 7、springboot 和 docker镜像,并且推送docker镜像到中央仓库,更进一步实现产品开箱即用;
1244
+- 8、多数据库支持;
1245
+- 9、执行器Log清理功能:调度中心Log删除时同步删除执行器中的Log文件;
1246
+- 10、Bean模式任务,JobHandler自动从执行器中查询展示为下拉框,选择后自动填充任务名称等属性;
1247
+- 11、API事件触发类型任务(更类似MQ消息)支持"动态传参、延时消费";该类型任务不走Quartz,单独建立MQ消息表,调度中心竞争触发;待定,该功能与 XXL-MQ 冲突,该场景建议用后者;
1248
+- 12、任务依赖增强,新增任务类型 "流程任务",流程节点可挂载普通类型任务,承担任务依赖功能。现有子任务模型取消;需要考虑任务依赖死循环问题;
1249
+- 13、分片任务某一分片失败,支持分片转移;
1250
+- 14、调度中心触发任务后,先推送触发队列,异步触发,然后立即返回。降低quartz线程占用时长。
1251
+- 15、任务告警逻辑调整:任务调度,以及任务回调失败时,均推送监控队列。后期考虑通过任务Log字段控制告警状态;
1252
+- 16、新增任务默认运行状态,任务更新时运行状态保持不变;
1253
+- 17、提供多版本执行器:不依赖容器版本、不内嵌Jetty版本(通过配置executoraddress替换jetty通讯)等;
1254
+- 18、注册中心支持扩展,除默认基于DB之外,支持扩展接入第三方注册中心如zk、eureka等;
1255
+- 19、依赖Core内部国际化处理;
1256
+- 20、流程任务,支持参数传递;
1257
+- 21、SimpleTrigger 支持;
1258
+- 22、支持通过API服务操作任务信息;
1260 1259
 
1261 1260
 
1262 1261
 ## 七、其他

+ 90 - 2
xxl-job-core/src/main/java/com/xxl/job/core/thread/TriggerCallbackThread.java Просмотреть файл

@@ -3,16 +3,21 @@ package com.xxl.job.core.thread;
3 3
 import com.xxl.job.core.biz.AdminBiz;
4 4
 import com.xxl.job.core.biz.model.HandleCallbackParam;
5 5
 import com.xxl.job.core.biz.model.ReturnT;
6
+import com.xxl.job.core.enums.RegistryConfig;
6 7
 import com.xxl.job.core.executor.XxlJobExecutor;
7 8
 import com.xxl.job.core.log.XxlJobFileAppender;
8 9
 import com.xxl.job.core.log.XxlJobLogger;
10
+import com.xxl.job.core.util.FileUtil;
11
+import com.xxl.job.core.util.JacksonUtil;
9 12
 import org.slf4j.Logger;
10 13
 import org.slf4j.LoggerFactory;
11 14
 
15
+import java.io.File;
12 16
 import java.util.ArrayList;
13 17
 import java.util.Date;
14 18
 import java.util.List;
15 19
 import java.util.concurrent.LinkedBlockingQueue;
20
+import java.util.concurrent.TimeUnit;
16 21
 
17 22
 /**
18 23
  * Created by xuxueli on 16/7/22.
@@ -38,6 +43,7 @@ public class TriggerCallbackThread {
38 43
      * callback thread
39 44
      */
40 45
     private Thread triggerCallbackThread;
46
+    private Thread triggerRetryCallbackThread;
41 47
     private volatile boolean toStop = false;
42 48
     public void start() {
43 49
 
@@ -47,6 +53,7 @@ public class TriggerCallbackThread {
47 53
             return;
48 54
         }
49 55
 
56
+        // callback
50 57
         triggerCallbackThread = new Thread(new Runnable() {
51 58
 
52 59
             @Override
@@ -89,16 +96,48 @@ public class TriggerCallbackThread {
89 96
         });
90 97
         triggerCallbackThread.setDaemon(true);
91 98
         triggerCallbackThread.start();
99
+
100
+
101
+        // retry
102
+        triggerRetryCallbackThread = new Thread(new Runnable() {
103
+            @Override
104
+            public void run() {
105
+                while(!toStop){
106
+                    try {
107
+                        retryFailCallbackFile();
108
+                    } catch (Exception e) {
109
+                        logger.error(e.getMessage(), e);
110
+                    }
111
+                    try {
112
+                        TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
113
+                    } catch (InterruptedException e) {
114
+                        logger.error(e.getMessage(), e);
115
+                    }
116
+                }
117
+                logger.info(">>>>>>>>>>> xxl-job, executor retry callback thread destory.");
118
+            }
119
+        });
120
+        triggerRetryCallbackThread.setDaemon(true);
121
+        triggerRetryCallbackThread.start();
122
+
92 123
     }
93 124
     public void toStop(){
94 125
         toStop = true;
95
-        // interrupt and wait
126
+        // stop callback, interrupt and wait
96 127
         triggerCallbackThread.interrupt();
97 128
         try {
98 129
             triggerCallbackThread.join();
99 130
         } catch (InterruptedException e) {
100 131
             logger.error(e.getMessage(), e);
101 132
         }
133
+
134
+        // stop retry, interrupt and wait
135
+        triggerRetryCallbackThread.interrupt();
136
+        try {
137
+            triggerRetryCallbackThread.join();
138
+        } catch (InterruptedException e) {
139
+            logger.error(e.getMessage(), e);
140
+        }
102 141
     }
103 142
 
104 143
     /**
@@ -106,21 +145,25 @@ public class TriggerCallbackThread {
106 145
      * @param callbackParamList
107 146
      */
108 147
     private void doCallback(List<HandleCallbackParam> callbackParamList){
148
+        boolean callbackRet = false;
109 149
         // callback, will retry if error
110 150
         for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
111 151
             try {
112 152
                 ReturnT<String> callbackResult = adminBiz.callback(callbackParamList);
113 153
                 if (callbackResult!=null && ReturnT.SUCCESS_CODE == callbackResult.getCode()) {
114 154
                     callbackLog(callbackParamList, "<br>----------- xxl-job callback success");
155
+                    callbackRet = true;
115 156
                     break;
116 157
                 } else {
117 158
                     callbackLog(callbackParamList, "<br>----------- xxl-job callback fail, callbackResult:" + callbackResult);
118 159
                 }
119 160
             } catch (Exception e) {
120 161
                 callbackLog(callbackParamList, "<br>----------- xxl-job callback error, errorMsg:" + e.getMessage());
121
-                //getInstance().callBackQueue.addAll(callbackParamList);
122 162
             }
123 163
         }
164
+        if (!callbackRet) {
165
+            appendFailCallbackFile(callbackParamList);
166
+        }
124 167
     }
125 168
 
126 169
     /**
@@ -134,4 +177,49 @@ public class TriggerCallbackThread {
134 177
         }
135 178
     }
136 179
 
180
+
181
+    // ---------------------- fial-callback file TODO ----------------------
182
+
183
+    private static String failCallbackFileName = XxlJobFileAppender.getLogPath().concat(File.separator).concat("xxl-job-callback").concat(".log");
184
+
185
+    private void appendFailCallbackFile(List<HandleCallbackParam> callbackParamList){
186
+        // append file
187
+        String content = JacksonUtil.writeValueAsString(callbackParamList);
188
+        FileUtil.appendFileLine(failCallbackFileName, content);
189
+    }
190
+
191
+    private void retryFailCallbackFile(){
192
+
193
+        // load and clear file
194
+        List<String> fileLines = FileUtil.loadFileLines(failCallbackFileName);
195
+        FileUtil.deleteFile(failCallbackFileName);
196
+
197
+        // parse
198
+        List<HandleCallbackParam> failCallbackParamList = new ArrayList<>();
199
+        if (fileLines!=null && fileLines.size()>0) {
200
+            for (String line: fileLines) {
201
+                List<HandleCallbackParam> failCallbackParamListTmp = JacksonUtil.readValue(line, List.class, HandleCallbackParam.class);
202
+                if (failCallbackParamListTmp!=null && failCallbackParamListTmp.size()>0) {
203
+                    failCallbackParamList.addAll(failCallbackParamListTmp);
204
+                }
205
+            }
206
+        }
207
+
208
+        // retry callback, 100 lines per page
209
+        if (failCallbackParamList!=null && failCallbackParamList.size()>0) {
210
+            int pagesize = 100;
211
+            List<HandleCallbackParam> pageData = new ArrayList<>();
212
+            for (int i = 0; i < failCallbackParamList.size(); i++) {
213
+                pageData.add(failCallbackParamList.get(i));
214
+                if (i>0 && i%pagesize == 0) {
215
+                    doCallback(pageData);
216
+                    pageData.clear();
217
+                }
218
+            }
219
+            if (pageData.size() > 0) {
220
+                doCallback(pageData);
221
+            }
222
+        }
223
+    }
224
+
137 225
 }

+ 98 - 1
xxl-job-core/src/main/java/com/xxl/job/core/util/FileUtil.java Просмотреть файл

@@ -1,6 +1,11 @@
1 1
 package com.xxl.job.core.util;
2 2
 
3
-import java.io.File;
3
+import org.slf4j.Logger;
4
+import org.slf4j.LoggerFactory;
5
+
6
+import java.io.*;
7
+import java.util.ArrayList;
8
+import java.util.List;
4 9
 
5 10
 /**
6 11
  * file tool
@@ -8,7 +13,14 @@ import java.io.File;
8 13
  * @author xuxueli 2017-12-29 17:56:48
9 14
  */
10 15
 public class FileUtil {
16
+    private static Logger logger = LoggerFactory.getLogger(FileUtil.class);
11 17
 
18
+    /**
19
+     * delete recursively
20
+     *
21
+     * @param root
22
+     * @return
23
+     */
12 24
     public static boolean deleteRecursively(File root) {
13 25
         if (root != null && root.exists()) {
14 26
             if (root.isDirectory()) {
@@ -24,4 +36,89 @@ public class FileUtil {
24 36
         return false;
25 37
     }
26 38
 
39
+    public static void deleteFile(String fileName) {
40
+        // file
41
+        File file = new File(fileName);
42
+        if (file.exists()) {
43
+            file.delete();
44
+        }
45
+    }
46
+
47
+    public static void appendFileLine(String fileName, String content) {
48
+
49
+        // file
50
+        File file = new File(fileName);
51
+        if (!file.exists()) {
52
+            try {
53
+                file.createNewFile();
54
+            } catch (IOException e) {
55
+                logger.error(e.getMessage(), e);
56
+                return;
57
+            }
58
+        }
59
+
60
+        // content
61
+        if (content == null) {
62
+            content = "";
63
+        }
64
+        content += "\r\n";
65
+
66
+        // append file content
67
+        FileOutputStream fos = null;
68
+        try {
69
+            fos = new FileOutputStream(file, true);
70
+            fos.write(content.getBytes("utf-8"));
71
+            fos.flush();
72
+        } catch (Exception e) {
73
+            logger.error(e.getMessage(), e);
74
+        } finally {
75
+            if (fos != null) {
76
+                try {
77
+                    fos.close();
78
+                } catch (IOException e) {
79
+                    logger.error(e.getMessage(), e);
80
+                }
81
+            }
82
+        }
83
+
84
+    }
85
+
86
+    public static List<String> loadFileLines(String fileName){
87
+
88
+        List<String> result = new ArrayList<>();
89
+
90
+        // valid log file
91
+        File file = new File(fileName);
92
+        if (!file.exists()) {
93
+            return result;
94
+        }
95
+
96
+        // read file
97
+        StringBuffer logContentBuffer = new StringBuffer();
98
+        int toLineNum = 0;
99
+        LineNumberReader reader = null;
100
+        try {
101
+            //reader = new LineNumberReader(new FileReader(logFile));
102
+            reader = new LineNumberReader(new InputStreamReader(new FileInputStream(file), "utf-8"));
103
+            String line = null;
104
+            while ((line = reader.readLine())!=null) {
105
+                if (line!=null && line.trim().length()>0) {
106
+                    result.add(line);
107
+                }
108
+            }
109
+        } catch (IOException e) {
110
+            logger.error(e.getMessage(), e);
111
+        } finally {
112
+            if (reader != null) {
113
+                try {
114
+                    reader.close();
115
+                } catch (IOException e) {
116
+                    logger.error(e.getMessage(), e);
117
+                }
118
+            }
119
+        }
120
+
121
+        return result;
122
+    }
123
+
27 124
 }

+ 27 - 3
xxl-job-core/src/main/java/com/xxl/job/core/util/JacksonUtil.java Просмотреть файл

@@ -2,7 +2,7 @@ package com.xxl.job.core.util;
2 2
 
3 3
 import com.fasterxml.jackson.core.JsonGenerationException;
4 4
 import com.fasterxml.jackson.core.JsonParseException;
5
-import com.fasterxml.jackson.core.type.TypeReference;
5
+import com.fasterxml.jackson.databind.JavaType;
6 6
 import com.fasterxml.jackson.databind.JsonMappingException;
7 7
 import com.fasterxml.jackson.databind.ObjectMapper;
8 8
 import org.slf4j.Logger;
@@ -68,7 +68,31 @@ public class JacksonUtil {
68 68
 		}
69 69
     	return null;
70 70
     }
71
-    public static <T> T readValueRefer(String jsonStr, Class<T> clazz) {
71
+
72
+	/**
73
+	 * string --> List<Bean>...
74
+	 *
75
+	 * @param jsonStr
76
+	 * @param parametrized
77
+	 * @param parameterClasses
78
+	 * @param <T>
79
+	 * @return
80
+	 */
81
+	public static <T> T readValue(String jsonStr, Class<?> parametrized, Class<?>... parameterClasses) {
82
+		try {
83
+			JavaType javaType = getInstance().getTypeFactory().constructParametricType(parametrized, parameterClasses);
84
+			return getInstance().readValue(jsonStr, javaType);
85
+		} catch (JsonParseException e) {
86
+			logger.error(e.getMessage(), e);
87
+		} catch (JsonMappingException e) {
88
+			logger.error(e.getMessage(), e);
89
+		} catch (IOException e) {
90
+			logger.error(e.getMessage(), e);
91
+		}
92
+		return null;
93
+	}
94
+
95
+    /*public static <T> T readValueRefer(String jsonStr, Class<T> clazz) {
72 96
     	try {
73 97
 			return getInstance().readValue(jsonStr, new TypeReference<T>() { });
74 98
 		} catch (JsonParseException e) {
@@ -79,7 +103,7 @@ public class JacksonUtil {
79 103
 			logger.error(e.getMessage(), e);
80 104
 		}
81 105
     	return null;
82
-    }
106
+    }*/
83 107
 
84 108
     public static void main(String[] args) {
85 109
 		try {