Procházet zdrojové kódy

路由策略新增 "忙碌转移" 模式:按照顺序依次进行空闲检测,第一个空闲检测成功的机器选定为目标执行器并发起调度;

xuxueli před 8 roky
rodič
revize
7e35088764

+ 2 - 1
README.md Zobrazit soubor

@@ -857,6 +857,7 @@ Tips: 历史版本(V1.3.x)目前已经Release至稳定版本, 进入维护阶段
857 857
 - 1、任务Cron更新逻辑优化,改为rescheduleJob,同时防止cron重复设置;
858 858
 - 2、优化:API回调服务失败状态码优化,方便问题排查;
859 859
 - 3、XxlJobLogger的日志多参数支持;
860
+- 4、路由策略新增 "忙碌转移" 模式:按照顺序依次进行空闲检测,第一个空闲检测成功的机器选定为目标执行器并发起调度;
860 861
 
861 862
 #### TODO LIST
862 863
 - 1、任务权限管理:执行器为粒度分配权限,核心操作校验权限;
@@ -868,7 +869,7 @@ Tips: 历史版本(V1.3.x)目前已经Release至稳定版本, 进入维护阶段
868 869
 - 7、调度任务优先级;
869 870
 - 8、移除quartz依赖,重写调度模块:新增或恢复任务时将下次执行记录插入delayqueue,调度中心集群竞争分布式锁,成功节点批量加载到期delayqueue数据,批量执行。
870 871
 - 9、任务线程轮空30次后自动销毁,降低低频任务的无效线程消耗。
871
-- 10、路由策略新增 "忙碌转移" 模式,发现执行器运行中,主动转移下一个执行器调度任务;
872
+
872 873
 
873 874
 ## 七、其他
874 875
 

+ 65 - 59
xxl-job-admin/src/main/java/com/xxl/job/admin/core/jobbean/RemoteHttpJobBean.java Zobrazit soubor

@@ -114,76 +114,82 @@ public class RemoteHttpJobBean extends QuartzJobBean {
114 114
 			return new ReturnT<String>(ReturnT.FAIL_CODE, triggerSb.toString());
115 115
 		}
116 116
 
117
-		// trigger remote executor
118
-		if (addressList.size() == 1) {
119
-			String address = addressList.get(0);
120
-			jobLog.setExecutorAddress(address);
121
-
122
-			ReturnT<String> runResult = runExecutor(triggerParam, address);
123
-			triggerSb.append("<br>----------------------<br>").append(runResult.getMsg());
117
+		// executor route strategy
118
+		ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null);
119
+		if (executorRouteStrategyEnum == null) {
120
+			triggerSb.append("<br>----------------------<br>").append("调度失败:").append("执行器路由策略为空");
121
+			return new ReturnT<String>(ReturnT.FAIL_CODE, triggerSb.toString());
122
+		}
123
+		triggerSb.append("<br>路由策略:").append(executorRouteStrategyEnum.name() + "-" + executorRouteStrategyEnum.getTitle());
124 124
 
125
-			return new ReturnT<String>(runResult.getCode(), triggerSb.toString());
126
-		} else {
127
-			// executor route strategy
128
-			ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null);
129
-			triggerSb.append("<br>路由策略:").append(executorRouteStrategyEnum!=null?(executorRouteStrategyEnum.name() + "-" + executorRouteStrategyEnum.getTitle()):null);
130
-			if (executorRouteStrategyEnum == null) {
131
-				triggerSb.append("<br>----------------------<br>").append("调度失败:").append("执行器路由策略为空");
132
-				return new ReturnT<String>(ReturnT.FAIL_CODE, triggerSb.toString());
133
-			}
125
+		// trigger remote executor
126
+		if (executorRouteStrategyEnum == ExecutorRouteStrategyEnum.FAILOVER) {
127
+			for (String address : addressList) {
128
+				// beat
129
+				ReturnT<String> beatResult = null;
130
+				try {
131
+					ExecutorBiz executorBiz = (ExecutorBiz) new NetComClientProxy(ExecutorBiz.class, address).getObject();
132
+					beatResult = executorBiz.beat();
133
+				} catch (Exception e) {
134
+					logger.error("", e);
135
+					beatResult = new ReturnT<String>(ReturnT.FAIL_CODE, ""+e );
136
+				}
137
+				triggerSb.append("<br>----------------------<br>")
138
+						.append("心跳检测:")
139
+						.append("<br>address:").append(address)
140
+						.append("<br>code:").append(beatResult.getCode())
141
+						.append("<br>msg:").append(beatResult.getMsg());
134 142
 
135
-			if (executorRouteStrategyEnum != ExecutorRouteStrategyEnum.FAILOVER) {
136
-				// get address
137
-				String address = executorRouteStrategyEnum.getRouter().route(jobInfo.getId(), addressList);
138
-				jobLog.setExecutorAddress(address);
143
+				// beat success
144
+				if (beatResult.getCode() == ReturnT.SUCCESS_CODE) {
145
+					jobLog.setExecutorAddress(address);
139 146
 
140
-				// run
141
-				ReturnT<String> runResult = runExecutor(triggerParam, address);
142
-				triggerSb.append("<br>----------------------<br>").append(runResult.getMsg());
147
+					ReturnT<String> runResult = runExecutor(triggerParam, address);
148
+					triggerSb.append("<br>----------------------<br>").append(runResult.getMsg());
143 149
 
144
-				return new ReturnT<String>(runResult.getCode(), triggerSb.toString());
145
-			} else {
146
-				for (String address : addressList) {
147
-					// beat
148
-					ReturnT<String> beatResult = beatExecutor(address);
149
-					triggerSb.append("<br>----------------------<br>").append(beatResult.getMsg());
150
+					return new ReturnT<String>(runResult.getCode(), triggerSb.toString());
151
+				}
152
+			}
153
+			return new ReturnT<String>(ReturnT.FAIL_CODE, triggerSb.toString());
154
+		} else if (executorRouteStrategyEnum == ExecutorRouteStrategyEnum.BUSYOVER) {
155
+			for (String address : addressList) {
156
+				// beat
157
+				ReturnT<String> idleBeatResult = null;
158
+				try {
159
+					ExecutorBiz executorBiz = (ExecutorBiz) new NetComClientProxy(ExecutorBiz.class, address).getObject();
160
+					idleBeatResult = executorBiz.idleBeat(triggerParam.getJobId());
161
+				} catch (Exception e) {
162
+					logger.error("", e);
163
+					idleBeatResult = new ReturnT<String>(ReturnT.FAIL_CODE, ""+e );
164
+				}
165
+				triggerSb.append("<br>----------------------<br>")
166
+						.append("空闲检测:")
167
+						.append("<br>address:").append(address)
168
+						.append("<br>code:").append(idleBeatResult.getCode())
169
+						.append("<br>msg:").append(idleBeatResult.getMsg());
150 170
 
151
-					if (beatResult.getCode() == ReturnT.SUCCESS_CODE) {
152
-						jobLog.setExecutorAddress(address);
171
+				// beat success
172
+				if (idleBeatResult.getCode() == ReturnT.SUCCESS_CODE) {
173
+					jobLog.setExecutorAddress(address);
153 174
 
154
-						ReturnT<String> runResult = runExecutor(triggerParam, address);
155
-						triggerSb.append("<br>----------------------<br>").append(runResult.getMsg());
175
+					ReturnT<String> runResult = runExecutor(triggerParam, address);
176
+					triggerSb.append("<br>----------------------<br>").append(runResult.getMsg());
156 177
 
157
-						return new ReturnT<String>(runResult.getCode(), triggerSb.toString());
158
-					}
178
+					return new ReturnT<String>(runResult.getCode(), triggerSb.toString());
159 179
 				}
160
-				return new ReturnT<String>(ReturnT.FAIL_CODE, triggerSb.toString());
161 180
 			}
162
-		}
163
-	}
164
-
165
-	/**
166
-	 * run executor
167
-	 * @param address
168
-	 * @return
169
-	 */
170
-	public ReturnT<String> beatExecutor(String address){
171
-		ReturnT<String> beatResult = null;
172
-		try {
173
-			ExecutorBiz executorBiz = (ExecutorBiz) new NetComClientProxy(ExecutorBiz.class, address).getObject();
174
-			beatResult = executorBiz.beat();
175
-		} catch (Exception e) {
176
-			logger.error("", e);
177
-			beatResult = new ReturnT<String>(ReturnT.FAIL_CODE, ""+e );
178
-		}
181
+			return new ReturnT<String>(ReturnT.FAIL_CODE, triggerSb.toString());
182
+		} else {
183
+			// get address
184
+			String address = executorRouteStrategyEnum.getRouter().route(jobInfo.getId(), addressList);
185
+			jobLog.setExecutorAddress(address);
179 186
 
180
-		StringBuffer sb = new StringBuffer("心跳检测:");
181
-		sb.append("<br>address:").append(address);
182
-		sb.append("<br>code:").append(beatResult.getCode());
183
-		sb.append("<br>msg:").append(beatResult.getMsg());
184
-		beatResult.setMsg(sb.toString());
187
+			// run
188
+			ReturnT<String> runResult = runExecutor(triggerParam, address);
189
+			triggerSb.append("<br>----------------------<br>").append(runResult.getMsg());
185 190
 
186
-		return beatResult;
191
+			return new ReturnT<String>(runResult.getCode(), triggerSb.toString());
192
+		}
187 193
 	}
188 194
 
189 195
 	/**

+ 2 - 1
xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/ExecutorRouteStrategyEnum.java Zobrazit soubor

@@ -14,7 +14,8 @@ public enum ExecutorRouteStrategyEnum {
14 14
     CONSISTENT_HASH("一致性HASH", new ExecutorRouteConsistentHash()),
15 15
     LEAST_FREQUENTLY_USED("最不经常使用", new ExecutorRouteLFU()),
16 16
     LEAST_RECENTLY_USED("最近最久未使用", new ExecutorRouteLRU()),
17
-    FAILOVER("故障转移", null);
17
+    FAILOVER("故障转移", null),
18
+    BUSYOVER("忙碌转移", null);
18 19
 
19 20
     ExecutorRouteStrategyEnum(String title, ExecutorRouter router) {
20 21
         this.title = title;

+ 8 - 0
xxl-job-core/src/main/java/com/xxl/job/core/biz/ExecutorBiz.java Zobrazit soubor

@@ -16,6 +16,14 @@ public interface ExecutorBiz {
16 16
     public ReturnT<String> beat();
17 17
 
18 18
     /**
19
+     * idle beat
20
+     *
21
+     * @param jobId
22
+     * @return
23
+     */
24
+    public ReturnT<String> idleBeat(int jobId);
25
+
26
+    /**
19 27
      * kill
20 28
      * @param jobId
21 29
      * @return

+ 16 - 0
xxl-job-core/src/main/java/com/xxl/job/core/biz/impl/ExecutorBizImpl.java Zobrazit soubor

@@ -30,6 +30,22 @@ public class ExecutorBizImpl implements ExecutorBiz {
30 30
     }
31 31
 
32 32
     @Override
33
+    public ReturnT<String> idleBeat(int jobId) {
34
+
35
+        // isRunningOrHasQueue
36
+        boolean isRunningOrHasQueue = false;
37
+        JobThread jobThread = XxlJobExecutor.loadJobThread(jobId);
38
+        if (jobThread != null && jobThread.isRunningOrHasQueue()) {
39
+            isRunningOrHasQueue = true;
40
+        }
41
+
42
+        if (isRunningOrHasQueue) {
43
+            return new ReturnT<String>(ReturnT.FAIL_CODE, "job thread is running or has trigger queue.");
44
+        }
45
+        return ReturnT.SUCCESS;
46
+    }
47
+
48
+    @Override
33 49
     public ReturnT<String> kill(int jobId) {
34 50
         // kill handlerThread, and create new one
35 51
         JobThread jobThread = XxlJobExecutor.loadJobThread(jobId);

+ 2 - 1
xxl-job-core/src/main/java/com/xxl/job/core/thread/ExecutorRegistryThread.java Zobrazit soubor

@@ -46,7 +46,8 @@ public class ExecutorRegistryThread extends Thread {
46 46
                     try {
47 47
                         RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appName, executorAddress);
48 48
                         ReturnT<String> registryResult = AdminApiUtil.callApiFailover(AdminApiUtil.REGISTRY, registryParam);
49
-                        logger.info(">>>>>>>>>>> xxl-job registry, RegistryParam:{}, registryResult:{}", new Object[]{registryParam.toString(), registryResult.toString()});
49
+                        logger.info(">>>>>>>>>>> xxl-job Executor registry {}, RegistryParam:{}, registryResult:{}",
50
+                                new Object[]{(registryResult.getCode()==ReturnT.SUCCESS_CODE?"success":"fail"), registryParam.toString(), registryResult.toString()});
50 51
                     } catch (Exception e) {
51 52
                         logger.error(">>>>>>>>>>> xxl-job ExecutorRegistryThread Exception:", e);
52 53
                     }