浏览代码

任务告警逻辑调整,改为通过扫描失败日志方式触发。一方面精确扫描失败任务,降低扫描范围;另一方面取消内存队列,降低线程内存消耗;

xuxueli 7 年前
父节点
当前提交
4dfd14a914

+ 1 - 1
doc/XXL-JOB官方文档.md 查看文件

1389
 
1389
 
1390
 ### 6.24 版本 v2.0.2 Release Notes[迭代中]
1390
 ### 6.24 版本 v2.0.2 Release Notes[迭代中]
1391
 - 1、调度中心告警邮件发送组件改为 “spring-boot-starter-mail”;
1391
 - 1、调度中心告警邮件发送组件改为 “spring-boot-starter-mail”;
1392
-- 2、[迭代中]任务告警逻辑调整:任务调度,以及任务回调失败时,均推送监控队列。考虑通过任务Log字段控制告警状态
1392
+- 2、任务告警逻辑调整,改为通过扫描失败日志方式触发。一方面精确扫描失败任务,降低扫描范围;另一方面取消内存队列,降低线程内存消耗
1393
 
1393
 
1394
 
1394
 
1395
 ### TODO LIST
1395
 ### TODO LIST

+ 1 - 0
doc/db/tables_xxl_job.sql 查看文件

187
   `handle_time` datetime DEFAULT NULL COMMENT '执行-时间',
187
   `handle_time` datetime DEFAULT NULL COMMENT '执行-时间',
188
   `handle_code` int(11) NOT NULL COMMENT '执行-状态',
188
   `handle_code` int(11) NOT NULL COMMENT '执行-状态',
189
   `handle_msg` text COMMENT '执行-日志',
189
   `handle_msg` text COMMENT '执行-日志',
190
+  `alarm_status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '告警状态:0-默认、1-无需告警、2-告警成功、3-告警失败',
190
   PRIMARY KEY (`id`),
191
   PRIMARY KEY (`id`),
191
   KEY `I_trigger_time` (`trigger_time`)
192
   KEY `I_trigger_time` (`trigger_time`)
192
 ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
193
 ) ENGINE=InnoDB DEFAULT CHARSET=utf8;

+ 12 - 0
xxl-job-admin/src/main/java/com/xxl/job/admin/core/model/XxlJobLog.java 查看文件

31
 	private int handleCode;
31
 	private int handleCode;
32
 	private String handleMsg;
32
 	private String handleMsg;
33
 
33
 
34
+	// alarm info
35
+	private int alarmStatus;
36
+
34
 	public int getId() {
37
 	public int getId() {
35
 		return id;
38
 		return id;
36
 	}
39
 	}
142
 	public void setHandleMsg(String handleMsg) {
145
 	public void setHandleMsg(String handleMsg) {
143
 		this.handleMsg = handleMsg;
146
 		this.handleMsg = handleMsg;
144
 	}
147
 	}
148
+
149
+	public int getAlarmStatus() {
150
+		return alarmStatus;
151
+	}
152
+
153
+	public void setAlarmStatus(int alarmStatus) {
154
+		this.alarmStatus = alarmStatus;
155
+	}
156
+
145
 }
157
 }

+ 44 - 65
xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobFailMonitorHelper.java 查看文件

7
 import com.xxl.job.admin.core.trigger.TriggerTypeEnum;
7
 import com.xxl.job.admin.core.trigger.TriggerTypeEnum;
8
 import com.xxl.job.admin.core.util.I18nUtil;
8
 import com.xxl.job.admin.core.util.I18nUtil;
9
 import com.xxl.job.core.biz.model.ReturnT;
9
 import com.xxl.job.core.biz.model.ReturnT;
10
-import com.xxl.job.core.handler.IJobHandler;
11
 import org.apache.commons.collections4.CollectionUtils;
10
 import org.apache.commons.collections4.CollectionUtils;
12
 import org.slf4j.Logger;
11
 import org.slf4j.Logger;
13
 import org.slf4j.LoggerFactory;
12
 import org.slf4j.LoggerFactory;
17
 import javax.mail.internet.MimeMessage;
16
 import javax.mail.internet.MimeMessage;
18
 import java.io.UnsupportedEncodingException;
17
 import java.io.UnsupportedEncodingException;
19
 import java.text.MessageFormat;
18
 import java.text.MessageFormat;
20
-import java.util.*;
21
-import java.util.concurrent.LinkedBlockingQueue;
19
+import java.util.Arrays;
20
+import java.util.HashSet;
21
+import java.util.List;
22
+import java.util.Set;
22
 import java.util.concurrent.TimeUnit;
23
 import java.util.concurrent.TimeUnit;
23
 
24
 
24
 /**
25
 /**
25
  * job monitor instance
26
  * job monitor instance
27
+ *
26
  * @author xuxueli 2015-9-1 18:05:56
28
  * @author xuxueli 2015-9-1 18:05:56
27
  */
29
  */
28
 public class JobFailMonitorHelper {
30
 public class JobFailMonitorHelper {
35
 
37
 
36
 	// ---------------------- monitor ----------------------
38
 	// ---------------------- monitor ----------------------
37
 
39
 
38
-	private LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<Integer>(0xfff8);
39
-
40
 	private Thread monitorThread;
40
 	private Thread monitorThread;
41
 	private volatile boolean toStop = false;
41
 	private volatile boolean toStop = false;
42
 	public void start(){
42
 	public void start(){
44
 
44
 
45
 			@Override
45
 			@Override
46
 			public void run() {
46
 			public void run() {
47
+
47
 				// monitor
48
 				// monitor
48
 				while (!toStop) {
49
 				while (!toStop) {
49
 					try {
50
 					try {
50
-						List<Integer> jobLogIdList = new ArrayList<Integer>();
51
-						int drainToNum = JobFailMonitorHelper.instance.queue.drainTo(jobLogIdList);
52
 
51
 
53
-						if (CollectionUtils.isNotEmpty(jobLogIdList)) {
54
-							for (Integer jobLogId : jobLogIdList) {
55
-								if (jobLogId==null || jobLogId==0) {
52
+						List<Integer> failLogIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findFailJobLogIds(1000);
53
+						if (CollectionUtils.isNotEmpty(failLogIds)) {
54
+							for (int failLogId: failLogIds) {
55
+
56
+								// lock log
57
+								int lockRet = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, 0, -1);
58
+								if (lockRet < 1) {
56
 									continue;
59
 									continue;
57
 								}
60
 								}
58
-								XxlJobLog log = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().load(jobLogId);
59
-								if (log == null) {
60
-									continue;
61
+								XxlJobLog log = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().load(failLogId);
62
+								XxlJobInfo info = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(log.getJobId());
63
+
64
+								// 1、fail retry monitor
65
+								if (log.getExecutorFailRetryCount() > 0) {
66
+									JobTriggerPoolHelper.trigger(log.getJobId(), TriggerTypeEnum.RETRY, (log.getExecutorFailRetryCount()-1), log.getExecutorShardingParam(), null);
67
+									String retryMsg = "<br><br><span style=\"color:#F39C12;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_type_retry") +"<<<<<<<<<<< </span><br>";
68
+									log.setTriggerMsg(log.getTriggerMsg() + retryMsg);
69
+									XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(log);
61
 								}
70
 								}
62
-								if (IJobHandler.SUCCESS.getCode() == log.getTriggerCode() && log.getHandleCode() == 0) {
63
-									// job running
64
-									JobFailMonitorHelper.monitor(jobLogId);
65
-									logger.debug(">>>>>>>>>>> job monitor, job running, JobLogId:{}", jobLogId);
66
-								} else if (IJobHandler.SUCCESS.getCode() == log.getHandleCode()) {
67
-									// job success, pass
68
-									logger.info(">>>>>>>>>>> job monitor, job success, JobLogId:{}", jobLogId);
69
-								} else /*if (IJobHandler.FAIL.getCode() == log.getTriggerCode()
70
-										|| IJobHandler.FAIL.getCode() == log.getHandleCode()
71
-										|| IJobHandler.FAIL_RETRY.getCode() == log.getHandleCode() )*/ {
72
-
73
-									// job fail,
74
-
75
-									// 1、fail retry
76
-									XxlJobInfo info = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(log.getJobId());
77
-
78
-									if (log.getExecutorFailRetryCount() > 0) {
79
-										JobTriggerPoolHelper.trigger(log.getJobId(), TriggerTypeEnum.RETRY, (log.getExecutorFailRetryCount()-1), log.getExecutorShardingParam(), null);
80
-										String retryMsg = "<br><br><span style=\"color:#F39C12;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_type_retry") +"<<<<<<<<<<< </span><br>";
81
-										log.setTriggerMsg(log.getTriggerMsg() + retryMsg);
82
-										XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(log);
83
-									}
84
 
71
 
85
-									// 2、fail alarm
86
-									failAlarm(info, log);
72
+								// 2、fail alarm monitor
73
+								int newAlarmStatus = 0;		// 告警状态:0-默认、-1=锁定状态、1-无需告警、2-告警成功、3-告警失败
74
+								if (info!=null && info.getAlarmEmail()!=null && info.getAlarmEmail().trim().length()>0) {
75
+									boolean alarmResult = true;
76
+									try {
77
+										alarmResult = failAlarm(info, log);
78
+									} catch (Exception e) {
79
+										alarmResult = false;
80
+										logger.error(e.getMessage(), e);
81
+									}
82
+									newAlarmStatus = alarmResult?2:3;
83
+								} else {
84
+									newAlarmStatus = 1;
85
+								}
87
 
86
 
88
-									logger.info(">>>>>>>>>>> job monitor, job fail, JobLogId:{}", jobLogId);
89
-								}/* else {
90
-									JobFailMonitorHelper.monitor(jobLogId);
91
-									logger.info(">>>>>>>>>>> job monitor, job status unknown, JobLogId:{}", jobLogId);
92
-								}*/
87
+								XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, -1, newAlarmStatus);
93
 							}
88
 							}
94
 						}
89
 						}
95
 
90
 
99
 					}
94
 					}
100
 				}
95
 				}
101
 
96
 
102
-				// monitor all clear
103
-				List<Integer> jobLogIdList = new ArrayList<Integer>();
104
-				int drainToNum = getInstance().queue.drainTo(jobLogIdList);
105
-				if (jobLogIdList!=null && jobLogIdList.size()>0) {
106
-					for (Integer jobLogId: jobLogIdList) {
107
-						XxlJobLog log = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().load(jobLogId);
108
-						if (ReturnT.FAIL_CODE == log.getTriggerCode()|| ReturnT.FAIL_CODE==log.getHandleCode()) {
109
-							// job fail,
110
-							XxlJobInfo info = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(log.getJobId());
111
-
112
-							failAlarm(info, log);
113
-							logger.info(">>>>>>>>>>> job monitor last, job fail, JobLogId:{}", jobLogId);
114
-						}
115
-					}
116
-				}
117
-
118
 			}
97
 			}
119
 		});
98
 		});
120
 		monitorThread.setDaemon(true);
99
 		monitorThread.setDaemon(true);
131
 			logger.error(e.getMessage(), e);
110
 			logger.error(e.getMessage(), e);
132
 		}
111
 		}
133
 	}
112
 	}
134
-	
135
-	// producer
136
-	public static void monitor(int jobLogId){
137
-		getInstance().queue.offer(jobLogId);
138
-	}
139
 
113
 
140
 
114
 
141
 	// ---------------------- alarm ----------------------
115
 	// ---------------------- alarm ----------------------
168
 	 *
142
 	 *
169
 	 * @param jobLog
143
 	 * @param jobLog
170
 	 */
144
 	 */
171
-	private void failAlarm(XxlJobInfo info, XxlJobLog jobLog){
145
+	private boolean failAlarm(XxlJobInfo info, XxlJobLog jobLog){
146
+		boolean alarmResult = true;
172
 
147
 
173
 		// send monitor email
148
 		// send monitor email
174
 		if (info!=null && info.getAlarmEmail()!=null && info.getAlarmEmail().trim().length()>0) {
149
 		if (info!=null && info.getAlarmEmail()!=null && info.getAlarmEmail().trim().length()>0) {
205
 					helper.setText(content, true);
180
 					helper.setText(content, true);
206
 
181
 
207
 					XxlJobAdminConfig.getAdminConfig().getMailSender().send(mimeMessage);
182
 					XxlJobAdminConfig.getAdminConfig().getMailSender().send(mimeMessage);
208
-				} catch (UnsupportedEncodingException | MessagingException e) {
183
+				} catch (Exception e) {
209
 					logger.error(">>>>>>>>>>> job monitor alarm email send error, JobLogId:{}", jobLog.getId(), e);
184
 					logger.error(">>>>>>>>>>> job monitor alarm email send error, JobLogId:{}", jobLog.getId(), e);
185
+
186
+					alarmResult = false;
210
 				}
187
 				}
211
 
188
 
212
 			}
189
 			}
214
 
191
 
215
 		// TODO, custom alarm strategy, such as sms
192
 		// TODO, custom alarm strategy, such as sms
216
 
193
 
194
+
195
+		return alarmResult;
217
 	}
196
 	}
218
 
197
 
219
 }
198
 }

+ 0 - 3
xxl-job-admin/src/main/java/com/xxl/job/admin/core/trigger/XxlJobTrigger.java 查看文件

6
 import com.xxl.job.admin.core.model.XxlJobLog;
6
 import com.xxl.job.admin.core.model.XxlJobLog;
7
 import com.xxl.job.admin.core.route.ExecutorRouteStrategyEnum;
7
 import com.xxl.job.admin.core.route.ExecutorRouteStrategyEnum;
8
 import com.xxl.job.admin.core.schedule.XxlJobDynamicScheduler;
8
 import com.xxl.job.admin.core.schedule.XxlJobDynamicScheduler;
9
-import com.xxl.job.admin.core.thread.JobFailMonitorHelper;
10
 import com.xxl.job.admin.core.util.I18nUtil;
9
 import com.xxl.job.admin.core.util.I18nUtil;
11
 import com.xxl.job.core.biz.ExecutorBiz;
10
 import com.xxl.job.core.biz.ExecutorBiz;
12
 import com.xxl.job.core.biz.model.ReturnT;
11
 import com.xxl.job.core.biz.model.ReturnT;
173
         jobLog.setTriggerMsg(triggerMsgSb.toString());
172
         jobLog.setTriggerMsg(triggerMsgSb.toString());
174
         XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(jobLog);
173
         XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(jobLog);
175
 
174
 
176
-        // 7、monitor trigger
177
-        JobFailMonitorHelper.monitor(jobLog.getId());
178
         logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId());
175
         logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId());
179
     }
176
     }
180
 
177
 

+ 6 - 0
xxl-job-admin/src/main/java/com/xxl/job/admin/dao/XxlJobLogDao.java 查看文件

50
 						@Param("clearBeforeTime") Date clearBeforeTime,
50
 						@Param("clearBeforeTime") Date clearBeforeTime,
51
 						@Param("clearBeforeNum") int clearBeforeNum);
51
 						@Param("clearBeforeNum") int clearBeforeNum);
52
 
52
 
53
+	public List<Integer> findFailJobLogIds(@Param("pagesize") int pagesize);
54
+
55
+	public int updateAlarmStatus(@Param("logId") int logId,
56
+								 @Param("oldAlarmStatus") int oldAlarmStatus,
57
+								 @Param("newAlarmStatus") int newAlarmStatus);
58
+
53
 }
59
 }

+ 23 - 3
xxl-job-admin/src/main/resources/mybatis-mapper/XxlJobLogMapper.xml 查看文件

22
 	    <result column="handle_time" property="handleTime" />
22
 	    <result column="handle_time" property="handleTime" />
23
 	    <result column="handle_code" property="handleCode" />
23
 	    <result column="handle_code" property="handleCode" />
24
 	    <result column="handle_msg" property="handleMsg" />
24
 	    <result column="handle_msg" property="handleMsg" />
25
-	    
25
+
26
+		<result column="alarm_status" property="alarmStatus" />
26
 	</resultMap>
27
 	</resultMap>
27
 
28
 
28
 	<sql id="Base_Column_List">
29
 	<sql id="Base_Column_List">
39
 		t.trigger_msg,
40
 		t.trigger_msg,
40
 		t.handle_time,
41
 		t.handle_time,
41
 		t.handle_code,
42
 		t.handle_code,
42
-		t.handle_msg
43
+		t.handle_msg,
44
+		t.alarm_status
43
 	</sql>
45
 	</sql>
44
 	
46
 	
45
 	<select id="pageList" resultMap="XxlJobLog">
47
 	<select id="pageList" resultMap="XxlJobLog">
176
 		SELECT
178
 		SELECT
177
 			DATE_FORMAT(trigger_time,'%Y-%m-%d') triggerDay,
179
 			DATE_FORMAT(trigger_time,'%Y-%m-%d') triggerDay,
178
 			COUNT(handle_code) triggerDayCount,
180
 			COUNT(handle_code) triggerDayCount,
179
-			SUM(CASE WHEN (trigger_code = 200 and handle_code = 0) then 1 else 0 end) as triggerDayCountRunning,
181
+			SUM(CASE WHEN (trigger_code in (0, 200) and handle_code = 0) then 1 else 0 end) as triggerDayCountRunning,
180
 			SUM(CASE WHEN handle_code = 200 then 1 else 0 end) as triggerDayCountSuc
182
 			SUM(CASE WHEN handle_code = 200 then 1 else 0 end) as triggerDayCountSuc
181
 		FROM XXL_JOB_QRTZ_TRIGGER_LOG
183
 		FROM XXL_JOB_QRTZ_TRIGGER_LOG
182
 		WHERE trigger_time BETWEEN #{from} and #{to}
184
 		WHERE trigger_time BETWEEN #{from} and #{to}
214
 			</if>
216
 			</if>
215
 		</trim>
217
 		</trim>
216
 	</delete>
218
 	</delete>
219
+
220
+	<select id="findFailJobLogIds" resultType="int" >
221
+		SELECT id FROM `XXL_JOB_QRTZ_TRIGGER_LOG`
222
+		WHERE !(
223
+			(trigger_code in (0, 200) and handle_code = 0)
224
+			OR
225
+			(handle_code = 200)
226
+		)
227
+		AND `alarm_status` = 0
228
+		ORDER BY id ASC
229
+	</select>
230
+
231
+	<update id="updateAlarmStatus" >
232
+		UPDATE XXL_JOB_QRTZ_TRIGGER_LOG
233
+		SET
234
+			`alarm_status` = #{newAlarmStatus}
235
+		WHERE `id`= #{logId} AND `alarm_status` = #{oldAlarmStatus}
236
+	</update>
217
 	
237
 	
218
 </mapper>
238
 </mapper>