Преглед на файлове

任务调度错过触发时间时的处理策略:

xuxueli преди 6 години
родител
ревизия
5c1f59323e
променени са 2 файла, в които са добавени 52 реда и са изтрити 41 реда
  1. 3 3
      doc/XXL-JOB官方文档.md
  2. 49 38
      xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobScheduleHelper.java

+ 3 - 3
doc/XXL-JOB官方文档.md Целия файл

836
 XXL-JOB的每个调度任务虽然在调度模块是并行调度执行的,但是任务调度传递到任务模块的“执行器”确实串行执行的,同时支持任务终止。
836
 XXL-JOB的每个调度任务虽然在调度模块是并行调度执行的,但是任务调度传递到任务模块的“执行器”确实串行执行的,同时支持任务终止。
837
 
837
 
838
 #### 5.4.6 过期处理策略
838
 #### 5.4.6 过期处理策略
839
-任务调度错过触发时间:
839
+任务调度错过触发时间时的处理策略
840
 - 可能原因:服务重启;调度线程被阻塞,线程被耗尽;上次调度持续阻塞,下次调度被错过;
840
 - 可能原因:服务重启;调度线程被阻塞,线程被耗尽;上次调度持续阻塞,下次调度被错过;
841
 - 处理策略:
841
 - 处理策略:
842
-    - 过期5s内:立即触发一次,并计算下次触发时间;
843
-    - 过期超过5s:忽略过期触发,计算下次触发时间
842
+    - 过期超10s:本地忽略,当前时间开始计算下次触发时间
843
+    - 过期超过5s:过期10s内:立即触发一次当前时间开始计算下次触发时间
844
 
844
 
845
 
845
 
846
 #### 5.4.7 日志回调服务
846
 #### 5.4.7 日志回调服务

+ 49 - 38
xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobScheduleHelper.java Целия файл

1
 package com.xxl.job.admin.core.thread;
1
 package com.xxl.job.admin.core.thread;
2
 
2
 
3
 import com.xxl.job.admin.core.conf.XxlJobAdminConfig;
3
 import com.xxl.job.admin.core.conf.XxlJobAdminConfig;
4
-import com.xxl.job.admin.core.model.XxlJobInfo;
5
 import com.xxl.job.admin.core.cron.CronExpression;
4
 import com.xxl.job.admin.core.cron.CronExpression;
5
+import com.xxl.job.admin.core.model.XxlJobInfo;
6
 import com.xxl.job.admin.core.trigger.TriggerTypeEnum;
6
 import com.xxl.job.admin.core.trigger.TriggerTypeEnum;
7
 import org.slf4j.Logger;
7
 import org.slf4j.Logger;
8
 import org.slf4j.LoggerFactory;
8
 import org.slf4j.LoggerFactory;
58
 
58
 
59
                         // tx start
59
                         // tx start
60
 
60
 
61
-                        // 1、查询JOB:"下次调度30s内"  ( ...... -5 ... now ...... +30 ...... )
61
+                        // 1、查询JOB:"下次调度30s内"
62
                         long maxNextTime = System.currentTimeMillis() + 30000;
62
                         long maxNextTime = System.currentTimeMillis() + 30000;
63
                         long nowTime = System.currentTimeMillis();
63
                         long nowTime = System.currentTimeMillis();
64
                         List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(maxNextTime);
64
                         List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(maxNextTime);
66
                             // 2、推送时间轮
66
                             // 2、推送时间轮
67
                             for (XxlJobInfo jobInfo: scheduleList) {
67
                             for (XxlJobInfo jobInfo: scheduleList) {
68
 
68
 
69
-                                // 过期策略:过期=更新下次触发时间为当前、过期是否5s内=立即出发,否则忽略;
70
-                                if (jobInfo.getTriggerNextTime() < nowTime) {
71
-                                    jobInfo.setTriggerNextTime(nowTime);
72
-                                    if (jobInfo.getTriggerNextTime() < nowTime-10000) {
73
-                                        continue;
74
-                                    }
69
+                                // 时间轮刻度计算
70
+                                int ringSecond = -1;
71
+                                if (jobInfo.getTriggerNextTime() < nowTime - 10000) {   // 过期超10s:本地忽略,当前时间开始计算下次触发时间
72
+                                    ringSecond = -1;
73
+
74
+                                    jobInfo.setTriggerLastTime(jobInfo.getTriggerNextTime());
75
+                                    jobInfo.setTriggerNextTime(
76
+                                            new CronExpression(jobInfo.getJobCron())
77
+                                                    .getNextValidTimeAfter(new Date())
78
+                                                    .getTime()
79
+                                    );
80
+                                } else if (jobInfo.getTriggerNextTime() < nowTime) {    // 过期10s内:立即触发一次,当前时间开始计算下次触发时间
81
+                                    ringSecond = (int)((nowTime/1000)%60);
82
+
83
+                                    jobInfo.setTriggerLastTime(jobInfo.getTriggerNextTime());
84
+                                    jobInfo.setTriggerNextTime(
85
+                                            new CronExpression(jobInfo.getJobCron())
86
+                                                    .getNextValidTimeAfter(new Date())
87
+                                                    .getTime()
88
+                                    );
89
+                                } else {    // 未过期:正常触发,递增计算下次触发时间
90
+                                    ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
91
+
92
+                                    jobInfo.setTriggerLastTime(jobInfo.getTriggerNextTime());
93
+                                    jobInfo.setTriggerNextTime(
94
+                                            new CronExpression(jobInfo.getJobCron())
95
+                                                    .getNextValidTimeAfter(new Date(jobInfo.getTriggerNextTime()))
96
+                                                    .getTime()
97
+                                    );
98
+                                }
99
+                                if (ringSecond == -1) {
100
+                                    continue;
75
                                 }
101
                                 }
76
 
102
 
77
                                 // push async ring
103
                                 // push async ring
78
-                                int second = (int)((jobInfo.getTriggerNextTime()/1000)%60);
79
-                                List<Integer> ringItemData = ringData.get(second);
104
+                                List<Integer> ringItemData = ringData.get(ringSecond);
80
                                 if (ringItemData == null) {
105
                                 if (ringItemData == null) {
81
                                     ringItemData = new ArrayList<Integer>();
106
                                     ringItemData = new ArrayList<Integer>();
82
-                                    ringData.put(second, ringItemData);
107
+                                    ringData.put(ringSecond, ringItemData);
83
                                 }
108
                                 }
84
                                 ringItemData.add(jobInfo.getId());
109
                                 ringItemData.add(jobInfo.getId());
85
 
110
 
86
-                                logger.info(">>>>>>>>>>> xxl-job, push time-ring : " + second + " = " + Arrays.asList(ringItemData) );
111
+                                logger.info(">>>>>>>>>>> xxl-job, push time-ring : " + ringSecond + " = " + Arrays.asList(ringItemData) );
87
                             }
112
                             }
88
 
113
 
89
                             // 3、更新trigger信息
114
                             // 3、更新trigger信息
90
                             for (XxlJobInfo jobInfo: scheduleList) {
115
                             for (XxlJobInfo jobInfo: scheduleList) {
91
-                                // update
92
-                                jobInfo.setTriggerLastTime(jobInfo.getTriggerNextTime());
93
-                                jobInfo.setTriggerNextTime(
94
-                                        new CronExpression(jobInfo.getJobCron())
95
-                                                .getNextValidTimeAfter(new Date(jobInfo.getTriggerNextTime()))
96
-                                                .getTime()
97
-                                );
98
                                 XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo);
116
                                 XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo);
99
                             }
117
                             }
100
 
118
 
141
                         List<Integer> ringItemData = new ArrayList<>();
159
                         List<Integer> ringItemData = new ArrayList<>();
142
                         int nowSecond = (int)((System.currentTimeMillis()/1000)%60);   // 避免处理耗时太长,跨过刻度;
160
                         int nowSecond = (int)((System.currentTimeMillis()/1000)%60);   // 避免处理耗时太长,跨过刻度;
143
                         if (lastSecond == -1) {
161
                         if (lastSecond == -1) {
144
-                            if (ringData.containsKey(nowSecond)) {
145
-                                List<Integer> tmpData = ringData.remove(nowSecond);
146
-                                if (tmpData != null) {
147
-                                    ringItemData.addAll(tmpData);
148
-                                }
162
+                            lastSecond = (nowSecond+59)%60;
163
+                        }
164
+                        for (int i = 1; i <=60; i++) {
165
+                            int secondItem = (lastSecond+i)%60;
166
+
167
+                            List<Integer> tmpData = ringData.remove(secondItem);
168
+                            if (tmpData != null) {
169
+                                ringItemData.addAll(tmpData);
149
                             }
170
                             }
150
-                            lastSecond = nowSecond;
151
-                        } else {
152
-                            for (int i = 1; i <=60; i++) {
153
-                                int secondItem = (lastSecond+i)%60;
154
-
155
-                                List<Integer> tmpData = ringData.remove(secondItem);
156
-                                if (tmpData != null) {
157
-                                    ringItemData.addAll(tmpData);
158
-                                }
159
 
171
 
160
-                                if (secondItem == nowSecond) {
161
-                                    break;
162
-                                }
172
+                            if (secondItem == nowSecond) {
173
+                                break;
163
                             }
174
                             }
164
-                            lastSecond = nowSecond;
165
                         }
175
                         }
176
+                        lastSecond = nowSecond;
177
+
166
 
178
 
167
                         logger.info(">>>>>>>>>>> xxl-job, time-ring beat : " + nowSecond + " = " + Arrays.asList(ringItemData) );
179
                         logger.info(">>>>>>>>>>> xxl-job, time-ring beat : " + nowSecond + " = " + Arrays.asList(ringItemData) );
168
                         if (ringItemData!=null && ringItemData.size()>0) {
180
                         if (ringItemData!=null && ringItemData.size()>0) {
171
                                 // do trigger
183
                                 // do trigger
172
                                 JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null);
184
                                 JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null);
173
                             }
185
                             }
174
-
175
                             // clear
186
                             // clear
176
                             ringItemData.clear();
187
                             ringItemData.clear();
177
                         }
188
                         }