浏览代码

调度中心任务监控线程销毁时,批量对失败任务告警,防止告警信息丢失;

xuxueli 7 年前
父节点
当前提交
edcea479bc

+ 6 - 5
doc/XXL-JOB官方文档.md 查看文件

987
 - 2、规范项目目录,方便扩展多执行器;
987
 - 2、规范项目目录,方便扩展多执行器;
988
 - 3、新增JFinal类型执行器sample示例项目;
988
 - 3、新增JFinal类型执行器sample示例项目;
989
 - 4、执行器手动设置IP时将会绑定Host;
989
 - 4、执行器手动设置IP时将会绑定Host;
990
-- 5、项目主页搭建,提供中英文文档;
991
-- 6、执行器回调线程优化,线程销毁前批量回调队列中数据,防止任务结果丢失;
992
-- 7、执行器注册线程优化,线程销毁时主动摘除注册机器信息,提高执行器注册的实时性;
990
+- 5、项目主页搭建,提供中英文文档(http://www.xuxueli.com/xxl-job);
991
+- 6、执行器回调线程销毁前, 批量回调队列中数据,防止任务结果丢失;
992
+- 7、执行器注册线程销毁时, 主动摘除注册机器信息,提高执行器注册的实时性;
993
+- 8、调度中心任务监控线程销毁时,批量对失败任务告警,防止告警信息丢失;
994
+- 9、调度中心API服务:支持API方式触发任务执行;
993
 
995
 
994
 ### TODO LIST
996
 ### TODO LIST
995
 - 1、任务权限管理:执行器为粒度分配权限,核心操作校验权限;
997
 - 1、任务权限管理:执行器为粒度分配权限,核心操作校验权限;
1002
 - 8、springboot 和 docker镜像,并且推送docker镜像到中央仓库,更进一步实现产品开箱即用;
1004
 - 8、springboot 和 docker镜像,并且推送docker镜像到中央仓库,更进一步实现产品开箱即用;
1003
 - 9、国际化:调度中心界面。
1005
 - 9、国际化:调度中心界面。
1004
 - 10、执行器摘除:执行器销毁时,主动通知调度中心并摘除对应执行器节点,提高执行器状态感知的时效性。
1006
 - 10、执行器摘除:执行器销毁时,主动通知调度中心并摘除对应执行器节点,提高执行器状态感知的时效性。
1005
-- 11、调度中心API服务:支持API方式触发任务执行;
1006
-- 12、任务参数类型改为string,进一步方便参数传递;
1007
+- 11、任务类方法"IJobHandler.execute"的参数类型改为"string",进一步方便参数传递;任务注解和任务类统一并改为"JobHandler"";
1007
 
1008
 
1008
 ## 七、其他
1009
 ## 七、其他
1009
 
1010
 

+ 55 - 25
xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobFailMonitorHelper.java 查看文件

10
 import org.slf4j.LoggerFactory;
10
 import org.slf4j.LoggerFactory;
11
 
11
 
12
 import java.text.MessageFormat;
12
 import java.text.MessageFormat;
13
-import java.util.Arrays;
14
-import java.util.HashSet;
15
-import java.util.Set;
16
-import java.util.concurrent.*;
13
+import java.util.*;
14
+import java.util.concurrent.LinkedBlockingQueue;
15
+import java.util.concurrent.TimeUnit;
17
 
16
 
18
 /**
17
 /**
19
  * job monitor instance
18
  * job monitor instance
36
 
35
 
37
 			@Override
36
 			@Override
38
 			public void run() {
37
 			public void run() {
38
+
39
+				// monitor
39
 				while (!toStop) {
40
 				while (!toStop) {
40
 					try {
41
 					try {
41
-						logger.debug(">>>>>>>>>>> job monitor beat ... ");
42
 						Integer jobLogId = JobFailMonitorHelper.instance.queue.take();
42
 						Integer jobLogId = JobFailMonitorHelper.instance.queue.take();
43
 						if (jobLogId != null && jobLogId > 0) {
43
 						if (jobLogId != null && jobLogId > 0) {
44
-							logger.debug(">>>>>>>>>>> job monitor heat success, JobLogId:{}", jobLogId);
45
 							XxlJobLog log = XxlJobDynamicScheduler.xxlJobLogDao.load(jobLogId);
44
 							XxlJobLog log = XxlJobDynamicScheduler.xxlJobLogDao.load(jobLogId);
46
 							if (log!=null) {
45
 							if (log!=null) {
47
 								if (ReturnT.SUCCESS_CODE==log.getTriggerCode() && log.getHandleCode()==0) {
46
 								if (ReturnT.SUCCESS_CODE==log.getTriggerCode() && log.getHandleCode()==0) {
48
-									// running
49
-									try {
50
-										TimeUnit.SECONDS.sleep(10);
51
-									} catch (InterruptedException e) {
52
-										e.printStackTrace();
53
-									}
47
+									// job running, wait + again monitor
48
+									TimeUnit.SECONDS.sleep(10);
49
+
54
 									JobFailMonitorHelper.monitor(jobLogId);
50
 									JobFailMonitorHelper.monitor(jobLogId);
51
+									logger.info(">>>>>>>>>>> job monitor, job running, JobLogId:{}", jobLogId);
55
 								}
52
 								}
56
 								if (ReturnT.SUCCESS_CODE==log.getTriggerCode() && ReturnT.SUCCESS_CODE==log.getHandleCode()) {
53
 								if (ReturnT.SUCCESS_CODE==log.getTriggerCode() && ReturnT.SUCCESS_CODE==log.getHandleCode()) {
57
-									// pass
54
+									// job success, pass
55
+									logger.info(">>>>>>>>>>> job monitor, job success, JobLogId:{}", jobLogId);
58
 								}
56
 								}
59
-								if (ReturnT.FAIL_CODE == log.getTriggerCode()|| ReturnT.FAIL_CODE==log.getHandleCode()) {
60
-									XxlJobInfo info = XxlJobDynamicScheduler.xxlJobInfoDao.loadById(log.getJobId());
61
-									if (info!=null && info.getAlarmEmail()!=null && info.getAlarmEmail().trim().length()>0) {
62
 
57
 
63
-										Set<String> emailSet = new HashSet<String>(Arrays.asList(info.getAlarmEmail().split(",")));
64
-										for (String email: emailSet) {
65
-											String title = "《调度监控报警》(任务调度中心XXL-JOB)";
66
-											XxlJobGroup group = XxlJobDynamicScheduler.xxlJobGroupDao.load(Integer.valueOf(info.getJobGroup()));
67
-											String content = MessageFormat.format("任务调度失败, 执行器名称:{0}, 任务描述:{1}.", group!=null?group.getTitle():"null", info.getJobDesc());
68
-											MailUtil.sendMail(email, title, content, false, null);
69
-										}
70
-									}
58
+								if (ReturnT.FAIL_CODE == log.getTriggerCode()|| ReturnT.FAIL_CODE==log.getHandleCode()) {
59
+									// job fail,
60
+									sendMonitorEmail(log);
61
+									logger.info(">>>>>>>>>>> job monitor, job fail, JobLogId:{}", jobLogId);
71
 								}
62
 								}
72
 							}
63
 							}
73
 						}
64
 						}
75
 						logger.error("job monitor error:{}", e);
66
 						logger.error("job monitor error:{}", e);
76
 					}
67
 					}
77
 				}
68
 				}
69
+
70
+				// monitor all clear
71
+				List<Integer> jobLogIdList = new ArrayList<Integer>();
72
+				int drainToNum = getInstance().queue.drainTo(jobLogIdList);
73
+				if (jobLogIdList!=null && jobLogIdList.size()>0) {
74
+					for (Integer jobLogId: jobLogIdList) {
75
+						XxlJobLog log = XxlJobDynamicScheduler.xxlJobLogDao.load(jobLogId);
76
+						if (ReturnT.FAIL_CODE == log.getTriggerCode()|| ReturnT.FAIL_CODE==log.getHandleCode()) {
77
+							// job fail,
78
+							sendMonitorEmail(log);
79
+							logger.info(">>>>>>>>>>> job monitor last, job fail, JobLogId:{}", jobLogId);
80
+						}
81
+					}
82
+				}
83
+
78
 			}
84
 			}
79
 		});
85
 		});
80
 		monitorThread.setDaemon(true);
86
 		monitorThread.setDaemon(true);
81
 		monitorThread.start();
87
 		monitorThread.start();
82
 	}
88
 	}
83
 
89
 
90
+	/**
91
+	 * send monitor email
92
+	 * @param jobLog
93
+	 */
94
+	private void sendMonitorEmail(XxlJobLog jobLog){
95
+		XxlJobInfo info = XxlJobDynamicScheduler.xxlJobInfoDao.loadById(jobLog.getJobId());
96
+		if (info!=null && info.getAlarmEmail()!=null && info.getAlarmEmail().trim().length()>0) {
97
+
98
+			Set<String> emailSet = new HashSet<String>(Arrays.asList(info.getAlarmEmail().split(",")));
99
+			for (String email: emailSet) {
100
+				String title = "《调度监控报警》(任务调度中心XXL-JOB)";
101
+				XxlJobGroup group = XxlJobDynamicScheduler.xxlJobGroupDao.load(Integer.valueOf(info.getJobGroup()));
102
+				String content = MessageFormat.format("任务调度失败, 执行器名称:{0}, 任务描述:{1}.", group!=null?group.getTitle():"null", info.getJobDesc());
103
+				MailUtil.sendMail(email, title, content, false, null);
104
+			}
105
+		}
106
+	}
107
+
84
 	public void toStop(){
108
 	public void toStop(){
85
 		toStop = true;
109
 		toStop = true;
86
-		//monitorThread.interrupt();
110
+		// interrupt and wait
111
+		monitorThread.interrupt();
112
+		try {
113
+			monitorThread.join();
114
+		} catch (InterruptedException e) {
115
+			logger.error(e.getMessage(), e);
116
+		}
87
 	}
117
 	}
88
 	
118
 	
89
 	// producer
119
 	// producer

+ 7 - 1
xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobRegistryMonitorHelper.java 查看文件

91
 
91
 
92
 	public void toStop(){
92
 	public void toStop(){
93
 		toStop = true;
93
 		toStop = true;
94
-		//registryThread.interrupt();
94
+		// interrupt and wait
95
+		registryThread.interrupt();
96
+		try {
97
+			registryThread.join();
98
+		} catch (InterruptedException e) {
99
+			logger.error(e.getMessage(), e);
100
+		}
95
 	}
101
 	}
96
 
102
 
97
 }
103
 }

+ 8 - 0
xxl-job-admin/src/main/java/com/xxl/job/admin/service/impl/AdminBizImpl.java 查看文件

4
 import com.xxl.job.admin.core.model.XxlJobInfo;
4
 import com.xxl.job.admin.core.model.XxlJobInfo;
5
 import com.xxl.job.admin.core.model.XxlJobLog;
5
 import com.xxl.job.admin.core.model.XxlJobLog;
6
 import com.xxl.job.admin.core.schedule.XxlJobDynamicScheduler;
6
 import com.xxl.job.admin.core.schedule.XxlJobDynamicScheduler;
7
+import com.xxl.job.admin.core.trigger.XxlJobTrigger;
7
 import com.xxl.job.admin.dao.XxlJobInfoDao;
8
 import com.xxl.job.admin.dao.XxlJobInfoDao;
8
 import com.xxl.job.admin.dao.XxlJobLogDao;
9
 import com.xxl.job.admin.dao.XxlJobLogDao;
9
 import com.xxl.job.admin.dao.XxlJobRegistryDao;
10
 import com.xxl.job.admin.dao.XxlJobRegistryDao;
124
         return ReturnT.SUCCESS;
125
         return ReturnT.SUCCESS;
125
     }
126
     }
126
 
127
 
128
+    @Override
129
+    public ReturnT<String> triggerJob(int jobId) {
130
+        // TODO (thread queue trigger)
131
+
132
+        return ReturnT.SUCCESS;
133
+    }
134
+
127
 }
135
 }

+ 18 - 7
xxl-job-admin/src/test/java/com/xxl/job/dao/impl/AdminBizTest.java 查看文件

14
  */
14
  */
15
 public class AdminBizTest {
15
 public class AdminBizTest {
16
 
16
 
17
+    // admin-client
18
+    private static String addressUrl = "http://127.0.0.1:8080/xxl-job-admin".concat(AdminBiz.MAPPING);
19
+    private static String accessToken = null;
20
+
17
     @Test
21
     @Test
18
     public void registryTest() throws Exception {
22
     public void registryTest() throws Exception {
19
-
20
-        // admin-client
21
-        String addressUrl = "http://127.0.0.1:8080/xxl-job-admin".concat(AdminBiz.MAPPING);
22
-        String accessToken = null;
23
         AdminBiz adminBiz = (AdminBiz) new NetComClientProxy(AdminBiz.class, addressUrl, accessToken).getObject();
23
         AdminBiz adminBiz = (AdminBiz) new NetComClientProxy(AdminBiz.class, addressUrl, accessToken).getObject();
24
 
24
 
25
         // test executor registry
25
         // test executor registry
26
         RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), "xxl-job-executor-example", "127.0.0.1:9999");
26
         RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), "xxl-job-executor-example", "127.0.0.1:9999");
27
         ReturnT<String> returnT = adminBiz.registry(registryParam);
27
         ReturnT<String> returnT = adminBiz.registry(registryParam);
28
         Assert.assertTrue(returnT.getCode() == ReturnT.SUCCESS_CODE);
28
         Assert.assertTrue(returnT.getCode() == ReturnT.SUCCESS_CODE);
29
+    }
29
 
30
 
30
-
31
+    @Test
32
+    public void registryRemove() throws Exception {
33
+        AdminBiz adminBiz = (AdminBiz) new NetComClientProxy(AdminBiz.class, addressUrl, accessToken).getObject();
31
 
34
 
32
         // test executor registry remove
35
         // test executor registry remove
33
-        registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), "xxl-job-executor-example", "127.0.0.1:9999");
34
-        returnT = adminBiz.registryRemove(registryParam);
36
+        RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), "xxl-job-executor-example", "127.0.0.1:9999");
37
+        ReturnT<String> returnT = adminBiz.registryRemove(registryParam);
35
         Assert.assertTrue(returnT.getCode() == ReturnT.SUCCESS_CODE);
38
         Assert.assertTrue(returnT.getCode() == ReturnT.SUCCESS_CODE);
39
+    }
40
+
41
+    @Test
42
+    public void triggerJob() throws Exception {
43
+        AdminBiz adminBiz = (AdminBiz) new NetComClientProxy(AdminBiz.class, addressUrl, accessToken).getObject();
36
 
44
 
45
+        int jobId = 1;
46
+        ReturnT<String> returnT = adminBiz.triggerJob(1);
47
+        Assert.assertTrue(returnT.getCode() == ReturnT.SUCCESS_CODE);
37
     }
48
     }
38
 
49
 
39
 }
50
 }

+ 9 - 0
xxl-job-core/src/main/java/com/xxl/job/core/biz/AdminBiz.java 查看文件

37
      */
37
      */
38
     public ReturnT<String> registryRemove(RegistryParam registryParam);
38
     public ReturnT<String> registryRemove(RegistryParam registryParam);
39
 
39
 
40
+
41
+    /**
42
+     * trigger job for once
43
+     *
44
+     * @param jobId
45
+     * @return
46
+     */
47
+    public ReturnT<String> triggerJob(int jobId);
48
+
40
 }
49
 }