Преглед изворни кода

分片任务失败重试优化,仅重试当前失败的分片;

xuxueli пре 6 година
родитељ
комит
54fce811eb

+ 2 - 1
doc/XXL-JOB官方文档.md Прегледај датотеку

@@ -1282,7 +1282,8 @@ Tips: 历史版本(V1.3.x)目前已经Release至稳定版本, 进入维护阶段
1282 1282
 - 32、底层RPC序列化协议调整为hessian2;
1283 1283
 - 33、修复表字段 “t.order”与数据库关键字冲突查询失败的问题,
1284 1284
 - 34、调度中心提供API服务,支持通过API服务对任务进行查询、新增、更新、启停等操作;
1285
-- 35、【迭代中】分片任务失败重试优化,仅重试当前失败的分片;
1285
+- 35、分片任务失败重试优化,仅重试当前失败的分片;
1286
+- 36、【迭代中】任务参数数据框调整,手动触发时支持动态输入参数;
1286 1287
 
1287 1288
 
1288 1289
 ### TODO LIST

+ 1 - 0
doc/db/tables_xxl_job.sql Прегледај датотеку

@@ -179,6 +179,7 @@ CREATE TABLE `XXL_JOB_QRTZ_TRIGGER_LOG` (
179 179
   `executor_address` varchar(255) DEFAULT NULL COMMENT '执行器地址,本次执行的地址',
180 180
   `executor_handler` varchar(255) DEFAULT NULL COMMENT '执行器任务handler',
181 181
   `executor_param` varchar(512) DEFAULT NULL COMMENT '执行器任务参数',
182
+  `executor_sharding_param` varchar(20) DEFAULT NULL COMMENT '执行器任务分片参数,格式如 1/2',
182 183
   `executor_fail_retry_count` int(11) NOT NULL DEFAULT '0' COMMENT '失败重试次数',
183 184
   `trigger_time` datetime DEFAULT NULL COMMENT '调度-时间',
184 185
   `trigger_code` int(11) NOT NULL COMMENT '调度-结果',

+ 1 - 1
xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobInfoController.java Прегледај датотеку

@@ -92,7 +92,7 @@ public class JobInfoController {
92 92
 	@ResponseBody
93 93
 	//@PermessionLimit(limit = false)
94 94
 	public ReturnT<String> triggerJob(int id) {
95
-		JobTriggerPoolHelper.trigger(id, -1, TriggerTypeEnum.MANUAL);
95
+		JobTriggerPoolHelper.trigger(id, TriggerTypeEnum.MANUAL, -1, null);
96 96
 		return ReturnT.SUCCESS;
97 97
 	}
98 98
 	

+ 1 - 1
xxl-job-admin/src/main/java/com/xxl/job/admin/core/jobbean/RemoteHttpJobBean.java Прегледај датотеку

@@ -29,7 +29,7 @@ public class RemoteHttpJobBean extends QuartzJobBean {
29 29
 
30 30
 		// trigger
31 31
 		//XxlJobTrigger.trigger(jobId);
32
-		JobTriggerPoolHelper.trigger(jobId, -1, TriggerTypeEnum.CRON);
32
+		JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null);
33 33
 	}
34 34
 
35 35
 }

+ 9 - 0
xxl-job-admin/src/main/java/com/xxl/job/admin/core/model/XxlJobLog.java Прегледај датотеку

@@ -18,6 +18,7 @@ public class XxlJobLog {
18 18
 	private String executorAddress;
19 19
 	private String executorHandler;
20 20
 	private String executorParam;
21
+	private String executorShardingParam;
21 22
 	private int executorFailRetryCount;
22 23
 	
23 24
 	// trigger info
@@ -78,6 +79,14 @@ public class XxlJobLog {
78 79
 		this.executorParam = executorParam;
79 80
 	}
80 81
 
82
+	public String getExecutorShardingParam() {
83
+		return executorShardingParam;
84
+	}
85
+
86
+	public void setExecutorShardingParam(String executorShardingParam) {
87
+		this.executorShardingParam = executorShardingParam;
88
+	}
89
+
81 90
 	public int getExecutorFailRetryCount() {
82 91
 		return executorFailRetryCount;
83 92
 	}

+ 1 - 4
xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobFailMonitorHelper.java Прегледај датотеку

@@ -73,10 +73,7 @@ public class JobFailMonitorHelper {
73 73
 									XxlJobInfo info = XxlJobDynamicScheduler.xxlJobInfoDao.loadById(log.getJobId());
74 74
 
75 75
 									if (log.getExecutorFailRetryCount() > 0) {
76
-
77
-										// TODO,分片任务失败重试优化,仅重试失败分片
78
-
79
-										JobTriggerPoolHelper.trigger(log.getJobId(), (log.getExecutorFailRetryCount()-1), TriggerTypeEnum.RETRY);
76
+										JobTriggerPoolHelper.trigger(log.getJobId(), TriggerTypeEnum.RETRY, (log.getExecutorFailRetryCount()-1), log.getExecutorShardingParam());
80 77
 										String retryMsg = "<br><br><span style=\"color:#F39C12;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_type_retry") +"<<<<<<<<<<< </span><br>";
81 78
 										log.setTriggerMsg(log.getTriggerMsg() + retryMsg);
82 79
 										XxlJobDynamicScheduler.xxlJobLogDao.updateTriggerInfo(log);

+ 4 - 4
xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobTriggerPoolHelper.java Прегледај датотеку

@@ -29,11 +29,11 @@ public class JobTriggerPoolHelper {
29 29
             new ThreadPoolExecutor.CallerRunsPolicy());
30 30
 
31 31
 
32
-    public void addTrigger(final int jobId, final int failRetryCount, final TriggerTypeEnum triggerType) {
32
+    public void addTrigger(final int jobId, final TriggerTypeEnum triggerType, final int failRetryCount, final String executorShardingParam) {
33 33
         triggerPool.execute(new Runnable() {
34 34
             @Override
35 35
             public void run() {
36
-                XxlJobTrigger.trigger(jobId, failRetryCount, triggerType);
36
+                XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam);
37 37
             }
38 38
         });
39 39
     }
@@ -55,8 +55,8 @@ public class JobTriggerPoolHelper {
55 55
      * 			<0: use param from job info config
56 56
      *
57 57
      */
58
-    public static void trigger(int jobId, int failRetryCount, TriggerTypeEnum triggerType) {
59
-        helper.addTrigger(jobId, failRetryCount, triggerType);
58
+    public static void trigger(int jobId, TriggerTypeEnum triggerType, int failRetryCount, String executorShardingParam) {
59
+        helper.addTrigger(jobId, triggerType, failRetryCount, executorShardingParam);
60 60
     }
61 61
 
62 62
     public static void toStop() {

+ 25 - 13
xxl-job-admin/src/main/java/com/xxl/job/admin/core/trigger/XxlJobTrigger.java Прегледај датотеку

@@ -13,12 +13,11 @@ import com.xxl.job.core.biz.model.TriggerParam;
13 13
 import com.xxl.job.core.enums.ExecutorBlockStrategyEnum;
14 14
 import com.xxl.job.core.util.IpUtil;
15 15
 import org.apache.commons.collections4.CollectionUtils;
16
+import org.apache.commons.lang3.StringUtils;
16 17
 import org.slf4j.Logger;
17 18
 import org.slf4j.LoggerFactory;
18 19
 
19
-import java.util.ArrayList;
20 20
 import java.util.Date;
21
-import java.util.List;
22 21
 
23 22
 /**
24 23
  * xxl-job trigger
@@ -36,27 +35,38 @@ public class XxlJobTrigger {
36 35
      * 			<0: use param from job info config
37 36
      *
38 37
      */
39
-    public static void trigger(int jobId, int failRetryCount, TriggerTypeEnum triggerType) {
38
+    public static void trigger(int jobId, TriggerTypeEnum triggerType, int failRetryCount, String executorShardingParam) {
40 39
         // load data
41
-        XxlJobInfo jobInfo = XxlJobDynamicScheduler.xxlJobInfoDao.loadById(jobId);              // job info
40
+        XxlJobInfo jobInfo = XxlJobDynamicScheduler.xxlJobInfoDao.loadById(jobId);
42 41
         if (jobInfo == null) {
43 42
             logger.warn(">>>>>>>>>>>> trigger fail, jobId invalid,jobId={}", jobId);
44 43
             return;
45 44
         }
46 45
         int finalFailRetryCount = failRetryCount>=0?failRetryCount:jobInfo.getExecutorFailRetryCount();
47
-        XxlJobGroup group = XxlJobDynamicScheduler.xxlJobGroupDao.load(jobInfo.getJobGroup());  // group info
46
+        XxlJobGroup group = XxlJobDynamicScheduler.xxlJobGroupDao.load(jobInfo.getJobGroup());
48 47
 
49 48
         // process trigger
50
-        if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null) && CollectionUtils.isNotEmpty(group.getRegistryList())) {
51
-            for (int i = 0; i < group.getRegistryList().size(); i++) {
52
-                processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i);
49
+        if (triggerType==TriggerTypeEnum.RETRY && executorShardingParam!=null) {
50
+            String[] shardingArr = executorShardingParam.split("/");
51
+            if (shardingArr.length==2 && StringUtils.isNumeric(shardingArr[0]) && StringUtils.isNumeric(shardingArr[1])); {
52
+                processTrigger(group, jobInfo, finalFailRetryCount, triggerType, Integer.valueOf(shardingArr[0]), Integer.valueOf(shardingArr[1]));
53 53
             }
54 54
         } else {
55
-            processTrigger(group, jobInfo, finalFailRetryCount, triggerType, 0);
55
+            if (CollectionUtils.isNotEmpty(group.getRegistryList())) {
56
+                if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null)) {
57
+                    for (int i = 0; i < group.getRegistryList().size(); i++) {
58
+                        processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, group.getRegistryList().size());
59
+                    }
60
+                } else {
61
+                    processTrigger(group, jobInfo, finalFailRetryCount, triggerType, 0, 1);
62
+                }
63
+            } else {
64
+                processTrigger(group, jobInfo, finalFailRetryCount, triggerType, 0, 0);
65
+            }
56 66
         }
57 67
     }
58 68
 
59
-    private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index){
69
+    private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total){
60 70
 
61 71
         // param
62 72
         ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorBlockStrategyEnum.SERIAL_EXECUTION);  // block strategy
@@ -83,11 +93,12 @@ public class XxlJobTrigger {
83 93
         triggerParam.setGlueSource(jobInfo.getGlueSource());
84 94
         triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime());
85 95
         triggerParam.setBroadcastIndex(index);
86
-        triggerParam.setBroadcastTotal(group.getRegistryList()!=null?group.getRegistryList().size():0);
96
+        triggerParam.setBroadcastTotal(total);
87 97
 
88 98
         // 3、init address
89 99
         String address = null;
90 100
         ReturnT<String> routeAddressResult = null;
101
+        String shardingParam = (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==executorRouteStrategyEnum &&total>0)?String.valueOf(triggerParam.getBroadcastIndex()).concat("/").concat(String.valueOf(triggerParam.getBroadcastTotal())):null;
91 102
         if (CollectionUtils.isNotEmpty(group.getRegistryList())) {
92 103
             if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum) {
93 104
                 address = group.getRegistryList().get(index);
@@ -117,8 +128,8 @@ public class XxlJobTrigger {
117 128
                 .append( (group.getAddressType() == 0)?I18nUtil.getString("jobgroup_field_addressType_0"):I18nUtil.getString("jobgroup_field_addressType_1") );
118 129
         triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regaddress")).append(":").append(group.getRegistryList());
119 130
         triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorRouteStrategy")).append(":").append(executorRouteStrategyEnum.getTitle());
120
-        if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null)) {
121
-            triggerMsgSb.append("("+index+"/"+(group.getRegistryList()!=null?group.getRegistryList().size():0)+")");
131
+        if (shardingParam != null) {
132
+            triggerMsgSb.append("("+shardingParam+")");
122 133
         }
123 134
         triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorBlockStrategy")).append(":").append(blockStrategy.getTitle());
124 135
         triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_timeout")).append(":").append(jobInfo.getExecutorTimeout());
@@ -131,6 +142,7 @@ public class XxlJobTrigger {
131 142
         jobLog.setExecutorAddress(address);
132 143
         jobLog.setExecutorHandler(jobInfo.getExecutorHandler());
133 144
         jobLog.setExecutorParam(jobInfo.getExecutorParam());
145
+        jobLog.setExecutorShardingParam(shardingParam);
134 146
         jobLog.setExecutorFailRetryCount(finalFailRetryCount);
135 147
         //jobLog.setTriggerTime();
136 148
         jobLog.setTriggerCode(triggerResult.getCode());

+ 1 - 1
xxl-job-admin/src/main/java/com/xxl/job/admin/service/impl/AdminBizImpl.java Прегледај датотеку

@@ -71,7 +71,7 @@ public class AdminBizImpl implements AdminBiz {
71 71
                     int childJobId = (StringUtils.isNotBlank(childJobIds[i]) && StringUtils.isNumeric(childJobIds[i]))?Integer.valueOf(childJobIds[i]):-1;
72 72
                     if (childJobId > 0) {
73 73
 
74
-                        JobTriggerPoolHelper.trigger(childJobId, 0, TriggerTypeEnum.PARENT);
74
+                        JobTriggerPoolHelper.trigger(childJobId, TriggerTypeEnum.PARENT, 0, null);
75 75
                         ReturnT<String> triggerChildResult = ReturnT.SUCCESS;
76 76
 
77 77
                         // add msg

+ 3 - 0
xxl-job-admin/src/main/resources/mybatis-mapper/XxlJobLogMapper.xml Прегледај датотеку

@@ -12,6 +12,7 @@
12 12
 		<result column="executor_address" property="executorAddress" />
13 13
 		<result column="executor_handler" property="executorHandler" />
14 14
 	    <result column="executor_param" property="executorParam" />
15
+		<result column="executor_sharding_param" property="executorShardingParam" />
15 16
 		<result column="executor_fail_retry_count" property="executorFailRetryCount" />
16 17
 	    
17 18
 	    <result column="trigger_time" property="triggerTime" />
@@ -31,6 +32,7 @@
31 32
 		t.executor_address,
32 33
 		t.executor_handler,
33 34
 		t.executor_param,
35
+		t.executor_sharding_param,
34 36
 		t.executor_fail_retry_count,
35 37
 		t.trigger_time,
36 38
 		t.trigger_code,
@@ -141,6 +143,7 @@
141 143
 			`executor_address`= #{executorAddress},
142 144
 			`executor_handler`=#{executorHandler},
143 145
 			`executor_param`= #{executorParam},
146
+			`executor_sharding_param`= #{executorShardingParam},
144 147
 			`executor_fail_retry_count`= #{executorFailRetryCount}
145 148
 		WHERE `id`= #{id}
146 149
 	</update>