Pārlūkot izejas kodu

修复任务监控线程被耗时任务阻塞的问题

xuxueli 7 gadus atpakaļ
vecāks
revīzija
ed31b1b66a

+ 2 - 1
doc/XXL-JOB官方文档.md Parādīt failu

@@ -571,7 +571,7 @@ XXL-JOB首先定制了Quartz原生表结构前缀(XXL_JOB_QRTZ_)。
571 571
 #### 5.3.2 系统组成
572 572
 - **调度模块(调度中心)**:
573 573
     负责管理调度信息,按照调度配置发出调度请求,自身不承担业务代码。调度系统与任务解耦,提高了系统可用性和稳定性,同时调度系统性能不再受限于任务模块;
574
-    支持可视化、简单且动态的管理调度信息,包括任务新建,更新,删除,GLUE开发和任务报警等,所有上述操作都会实时生效,同时支持监控调度结果以及执行日志,支持执行器Failover。
574
+    支持可视化、简单且动态的管理调度信息,包括任务新建,更新,删除,GLUE开发和任务报警等,所有上述操作都会实时生效,同时支持监控调度结果以及执行日志,支持执行器Failover。
575 575
 - **执行模块(执行器)**:
576 576
     负责接收调度请求并执行任务逻辑。任务模块专注于任务的执行等操作,开发和维护更加简单和高效;
577 577
     接收“调度中心”的执行请求、终止请求和日志请求等。
@@ -1020,6 +1020,7 @@ Tips: 历史版本(V1.3.x)目前已经Release至稳定版本, 进入维护阶段
1020 1020
 
1021 1021
 ### 6.20 版本 V1.9.0 特性[迭代中]
1022 1022
 - 1、新增任务运行模式 "GLUE模式(NodeJS) ",支持NodeJS脚本任务;
1023
+- 2、修复任务监控线程被耗时任务阻塞的问题;
1023 1024
 
1024 1025
 ### TODO LIST
1025 1026
 - 1、任务权限管理:执行器为粒度分配权限,核心操作校验权限;

+ 17 - 11
xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobFailMonitorHelper.java Parādīt failu

@@ -6,6 +6,7 @@ import com.xxl.job.admin.core.model.XxlJobLog;
6 6
 import com.xxl.job.admin.core.schedule.XxlJobDynamicScheduler;
7 7
 import com.xxl.job.admin.core.util.MailUtil;
8 8
 import com.xxl.job.core.biz.model.ReturnT;
9
+import org.apache.commons.collections.CollectionUtils;
9 10
 import org.slf4j.Logger;
10 11
 import org.slf4j.LoggerFactory;
11 12
 
@@ -35,33 +36,38 @@ public class JobFailMonitorHelper {
35 36
 
36 37
 			@Override
37 38
 			public void run() {
38
-
39 39
 				// monitor
40 40
 				while (!toStop) {
41 41
 					try {
42
-						Integer jobLogId = JobFailMonitorHelper.instance.queue.take();
43
-						if (jobLogId != null && jobLogId > 0) {
44
-							XxlJobLog log = XxlJobDynamicScheduler.xxlJobLogDao.load(jobLogId);
45
-							if (log!=null) {
46
-								if (ReturnT.SUCCESS_CODE==log.getTriggerCode() && log.getHandleCode()==0) {
47
-									// job running, wait + again monitor
48
-									TimeUnit.SECONDS.sleep(10);
42
+						List<Integer> jobLogIdList = new ArrayList<Integer>();
43
+						int drainToNum = JobFailMonitorHelper.instance.queue.drainTo(jobLogIdList);
49 44
 
45
+						if (CollectionUtils.isNotEmpty(jobLogIdList)) {
46
+							for (Integer jobLogId : jobLogIdList) {
47
+								if (jobLogId==null || jobLogId==0) {
48
+									continue;
49
+								}
50
+								XxlJobLog log = XxlJobDynamicScheduler.xxlJobLogDao.load(jobLogId);
51
+								if (log == null) {
52
+									continue;
53
+								}
54
+								if (ReturnT.SUCCESS_CODE == log.getTriggerCode() && log.getHandleCode() == 0) {
50 55
 									JobFailMonitorHelper.monitor(jobLogId);
51 56
 									logger.info(">>>>>>>>>>> job monitor, job running, JobLogId:{}", jobLogId);
52 57
 								}
53
-								if (ReturnT.SUCCESS_CODE==log.getTriggerCode() && ReturnT.SUCCESS_CODE==log.getHandleCode()) {
58
+								if (ReturnT.SUCCESS_CODE == log.getTriggerCode() && ReturnT.SUCCESS_CODE == log.getHandleCode()) {
54 59
 									// job success, pass
55 60
 									logger.info(">>>>>>>>>>> job monitor, job success, JobLogId:{}", jobLogId);
56 61
 								}
57
-
58
-								if (ReturnT.FAIL_CODE == log.getTriggerCode()|| ReturnT.FAIL_CODE==log.getHandleCode()) {
62
+								if (ReturnT.FAIL_CODE == log.getTriggerCode() || ReturnT.FAIL_CODE == log.getHandleCode()) {
59 63
 									// job fail,
60 64
 									sendMonitorEmail(log);
61 65
 									logger.info(">>>>>>>>>>> job monitor, job fail, JobLogId:{}", jobLogId);
62 66
 								}
63 67
 							}
64 68
 						}
69
+
70
+						TimeUnit.SECONDS.sleep(10);
65 71
 					} catch (Exception e) {
66 72
 						logger.error("job monitor error:{}", e);
67 73
 					}