Browse Source

任务线程轮空30次后自动销毁,降低低频任务的无效线程消耗。

xuxueli 8 years ago
parent
commit
3138d48afa

+ 2 - 2
doc/XXL-JOB官方文档.md View File

864
 - 3、XxlJobLogger的日志多参数支持;
864
 - 3、XxlJobLogger的日志多参数支持;
865
 - 4、路由策略新增 "忙碌转移" 模式:按照顺序依次进行空闲检测,第一个空闲检测成功的机器选定为目标执行器并发起调度;
865
 - 4、路由策略新增 "忙碌转移" 模式:按照顺序依次进行空闲检测,第一个空闲检测成功的机器选定为目标执行器并发起调度;
866
 - 5、路由策略代码重构;
866
 - 5、路由策略代码重构;
867
+- 6、执行器重复注册问题修复;
868
+- 7、任务线程轮空30次后自动销毁,降低低频任务的无效线程消耗。
867
 
869
 
868
 #### TODO LIST
870
 #### TODO LIST
869
 - 1、任务权限管理:执行器为粒度分配权限,核心操作校验权限;
871
 - 1、任务权限管理:执行器为粒度分配权限,核心操作校验权限;
874
 - 6、任务依赖,流程图,子任务+会签任务,各节点日志;
876
 - 6、任务依赖,流程图,子任务+会签任务,各节点日志;
875
 - 7、调度任务优先级;
877
 - 7、调度任务优先级;
876
 - 8、移除quartz依赖,重写调度模块:新增或恢复任务时将下次执行记录插入delayqueue,调度中心集群竞争分布式锁,成功节点批量加载到期delayqueue数据,批量执行。
878
 - 8、移除quartz依赖,重写调度模块:新增或恢复任务时将下次执行记录插入delayqueue,调度中心集群竞争分布式锁,成功节点批量加载到期delayqueue数据,批量执行。
877
-- 9、任务线程轮空30次后自动销毁,降低低频任务的无效线程消耗。
878
-- 10、注册界面,出现重复地址问题;
879
 
879
 
880
 
880
 
881
 ## 七、其他
881
 ## 七、其他

+ 3 - 1
xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobRegistryMonitorHelper.java View File

46
 								if (registryList == null) {
46
 								if (registryList == null) {
47
 									registryList = new ArrayList<String>();
47
 									registryList = new ArrayList<String>();
48
 								}
48
 								}
49
-								registryList.add(item.getRegistryValue());
49
+								if (!registryList.contains(item.getRegistryValue())) {
50
+									registryList.add(item.getRegistryValue());
51
+								}
50
 								temp.put(groupKey, registryList);
52
 								temp.put(groupKey, registryList);
51
 							}
53
 							}
52
 						}
54
 						}

+ 1 - 1
xxl-job-core/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java View File

125
     // ---------------------------------- job thread repository
125
     // ---------------------------------- job thread repository
126
     private static ConcurrentHashMap<Integer, JobThread> JobThreadRepository = new ConcurrentHashMap<Integer, JobThread>();
126
     private static ConcurrentHashMap<Integer, JobThread> JobThreadRepository = new ConcurrentHashMap<Integer, JobThread>();
127
     public static JobThread registJobThread(int jobId, IJobHandler handler, String removeOldReason){
127
     public static JobThread registJobThread(int jobId, IJobHandler handler, String removeOldReason){
128
-        JobThread newJobThread = new JobThread(handler);
128
+        JobThread newJobThread = new JobThread(jobId, handler);
129
         newJobThread.start();
129
         newJobThread.start();
130
         logger.info(">>>>>>>>>>> xxl-job regist JobThread success, jobId:{}, handler:{}", new Object[]{jobId, handler});
130
         logger.info(">>>>>>>>>>> xxl-job regist JobThread success, jobId:{}, handler:{}", new Object[]{jobId, handler});
131
 
131
 

+ 15 - 8
xxl-job-core/src/main/java/com/xxl/job/core/thread/JobThread.java View File

3
 import com.xxl.job.core.biz.model.HandleCallbackParam;
3
 import com.xxl.job.core.biz.model.HandleCallbackParam;
4
 import com.xxl.job.core.biz.model.ReturnT;
4
 import com.xxl.job.core.biz.model.ReturnT;
5
 import com.xxl.job.core.biz.model.TriggerParam;
5
 import com.xxl.job.core.biz.model.TriggerParam;
6
+import com.xxl.job.core.executor.XxlJobExecutor;
6
 import com.xxl.job.core.handler.IJobHandler;
7
 import com.xxl.job.core.handler.IJobHandler;
7
 import com.xxl.job.core.log.XxlJobFileAppender;
8
 import com.xxl.job.core.log.XxlJobFileAppender;
8
 import com.xxl.job.core.log.XxlJobLogger;
9
 import com.xxl.job.core.log.XxlJobLogger;
23
  */
24
  */
24
 public class JobThread extends Thread{
25
 public class JobThread extends Thread{
25
 	private static Logger logger = LoggerFactory.getLogger(JobThread.class);
26
 	private static Logger logger = LoggerFactory.getLogger(JobThread.class);
26
-	
27
+
28
+	private int jobId;
27
 	private IJobHandler handler;
29
 	private IJobHandler handler;
28
 	private LinkedBlockingQueue<TriggerParam> triggerQueue;
30
 	private LinkedBlockingQueue<TriggerParam> triggerQueue;
29
 	private ConcurrentHashSet<Integer> triggerLogIdSet;		// avoid repeat trigger for the same TRIGGER_LOG_ID
31
 	private ConcurrentHashSet<Integer> triggerLogIdSet;		// avoid repeat trigger for the same TRIGGER_LOG_ID
32
 	private String stopReason;
34
 	private String stopReason;
33
 
35
 
34
     private boolean running = false;    // if running job
36
     private boolean running = false;    // if running job
37
+	private int idleTimes = 0;			// idel times
35
 
38
 
36
 
39
 
37
-	public JobThread(IJobHandler handler) {
40
+	public JobThread(int jobId, IJobHandler handler) {
41
+		this.jobId = jobId;
38
 		this.handler = handler;
42
 		this.handler = handler;
39
-		triggerQueue = new LinkedBlockingQueue<TriggerParam>();
40
-		triggerLogIdSet = new ConcurrentHashSet<Integer>();
43
+		this.triggerQueue = new LinkedBlockingQueue<TriggerParam>();
44
+		this.triggerLogIdSet = new ConcurrentHashSet<Integer>();
41
 	}
45
 	}
42
 	public IJobHandler getHandler() {
46
 	public IJobHandler getHandler() {
43
 		return handler;
47
 		return handler;
88
 	public void run() {
92
 	public void run() {
89
 		while(!toStop){
93
 		while(!toStop){
90
 			running = false;
94
 			running = false;
95
+			idleTimes++;
91
 			try {
96
 			try {
92
 				// to check toStop signal, we need cycle, so wo cannot use queue.take(), instand of poll(timeout)
97
 				// to check toStop signal, we need cycle, so wo cannot use queue.take(), instand of poll(timeout)
93
 				TriggerParam triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS);
98
 				TriggerParam triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS);
94
 				if (triggerParam!=null) {
99
 				if (triggerParam!=null) {
95
 					running = true;
100
 					running = true;
101
+					idleTimes = 0;
96
 					triggerLogIdSet.remove(triggerParam.getLogId());
102
 					triggerLogIdSet.remove(triggerParam.getLogId());
97
 					
103
 					
98
 					// parse param
104
 					// parse param
126
 
132
 
127
 						XxlJobLogger.log("<br>----------- JobThread Exception:" + errorMsg + "<br>----------- xxl-job job execute end(error) -----------");
133
 						XxlJobLogger.log("<br>----------- JobThread Exception:" + errorMsg + "<br>----------- xxl-job job execute end(error) -----------");
128
 					}
134
 					}
129
-
130
-
131
-
132
 					
135
 					
133
 					// callback handler info
136
 					// callback handler info
134
 					if (!toStop) {
137
 					if (!toStop) {
139
 						ReturnT<String> stopResult = new ReturnT<String>(ReturnT.FAIL_CODE, stopReason + " [业务运行中,被强制终止]");
142
 						ReturnT<String> stopResult = new ReturnT<String>(ReturnT.FAIL_CODE, stopReason + " [业务运行中,被强制终止]");
140
 						TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), stopResult));
143
 						TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), stopResult));
141
 					}
144
 					}
145
+				} else {
146
+					if (idleTimes > 3) {
147
+						XxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit.");
148
+					}
142
 				}
149
 				}
143
-			} catch (Exception e) {
150
+			} catch (Throwable e) {
144
 				if (toStop) {
151
 				if (toStop) {
145
 					XxlJobLogger.log("<br>----------- xxl-job toStop, stopReason:" + stopReason);
152
 					XxlJobLogger.log("<br>----------- xxl-job toStop, stopReason:" + stopReason);
146
 				}
153
 				}