|
@@ -59,233 +59,240 @@ public class JobScheduleHelper {
|
59
|
59
|
scheduleThread = new Thread(new Runnable() {
|
60
|
60
|
@Override
|
61
|
61
|
public void run() {
|
|
62
|
+ jobScheduleStop();
|
|
63
|
+ }
|
|
64
|
+ });
|
|
65
|
+ scheduleThread.setDaemon(true);
|
|
66
|
+ scheduleThread.setName("xxl-job, admin JobScheduleHelper#scheduleThread");
|
|
67
|
+ scheduleThread.start();
|
62
|
68
|
|
63
|
|
- try {
|
64
|
|
- TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis() % 1000);
|
65
|
|
- } catch (InterruptedException e) {
|
66
|
|
- if (!scheduleThreadToStop) {
|
67
|
|
- logger.error(e.getMessage(), e);
|
68
|
|
- }
|
69
|
|
- }
|
70
|
|
- logger.info(">>>>>>>>> init xxl-job admin scheduler success.");
|
71
|
|
-
|
72
|
|
- // pre-read count: treadpool-size * trigger-qps (each trigger cost 50ms, qps = 1000/50 = 20)
|
73
|
|
- int preReadCount = (xxlJobAdminConfig.getTriggerPoolFastMax() + xxlJobAdminConfig.getTriggerPoolSlowMax()) * 20;
|
|
69
|
+ // ring thread
|
|
70
|
+ ringThread = new Thread(new Runnable() {
|
|
71
|
+ @Override
|
|
72
|
+ public void run() {
|
|
73
|
+ jobScheduleRing();
|
|
74
|
+ }
|
|
75
|
+ });
|
|
76
|
+ ringThread.setDaemon(true);
|
|
77
|
+ ringThread.setName("xxl-job, admin JobScheduleHelper#ringThread");
|
|
78
|
+ ringThread.start();
|
|
79
|
+ }
|
74
|
80
|
|
75
|
|
- while (!scheduleThreadToStop) {
|
|
81
|
+ private void jobScheduleStop() {
|
76
|
82
|
|
77
|
|
- // Scan Job
|
78
|
|
- long start = System.currentTimeMillis();
|
|
83
|
+ try {
|
|
84
|
+ TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis() % 1000);
|
|
85
|
+ } catch (InterruptedException e) {
|
|
86
|
+ if (!scheduleThreadToStop) {
|
|
87
|
+ logger.error(e.getMessage(), e);
|
|
88
|
+ }
|
|
89
|
+ }
|
|
90
|
+ logger.info(">>>>>>>>> init xxl-job admin scheduler success.");
|
79
|
91
|
|
80
|
|
- Connection conn = null;
|
81
|
|
- Boolean connAutoCommit = null;
|
82
|
|
- PreparedStatement preparedStatement = null;
|
|
92
|
+ // pre-read count: treadpool-size * trigger-qps (each trigger cost 50ms, qps = 1000/50 = 20)
|
|
93
|
+ int preReadCount = (xxlJobAdminConfig.getTriggerPoolFastMax() + xxlJobAdminConfig.getTriggerPoolSlowMax()) * 20;
|
83
|
94
|
|
84
|
|
- boolean preReadSuc = true;
|
85
|
|
- try {
|
|
95
|
+ while (!scheduleThreadToStop) {
|
86
|
96
|
|
87
|
|
- conn = dataSource.getConnection();
|
88
|
|
- connAutoCommit = conn.getAutoCommit();
|
89
|
|
- conn.setAutoCommit(false);
|
|
97
|
+ // Scan Job
|
|
98
|
+ long start = System.currentTimeMillis();
|
90
|
99
|
|
91
|
|
- preparedStatement = conn.prepareStatement("select * from xxl_job_lock where lock_name = 'schedule_lock' for update");
|
92
|
|
- preparedStatement.execute();
|
|
100
|
+ Connection conn = null;
|
|
101
|
+ boolean connAutoCommit = true;
|
|
102
|
+ PreparedStatement preparedStatement = null;
|
93
|
103
|
|
94
|
|
- // tx start
|
|
104
|
+ boolean preReadSuc = true;
|
|
105
|
+ try {
|
95
|
106
|
|
96
|
|
- // 1、pre read
|
97
|
|
- long nowTime = System.currentTimeMillis();
|
98
|
|
- List<XxlJobInfo> scheduleList = xxlJobInfoDao.scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);
|
99
|
|
- if (scheduleList != null && scheduleList.size() > 0) {
|
100
|
|
- // 2、push time-ring
|
101
|
|
- for (XxlJobInfo jobInfo : scheduleList) {
|
|
107
|
+ conn = dataSource.getConnection();
|
|
108
|
+ connAutoCommit = conn.getAutoCommit();
|
|
109
|
+ conn.setAutoCommit(false);
|
102
|
110
|
|
103
|
|
- // time-ring jump
|
104
|
|
- if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {
|
105
|
|
- // 2.1、trigger-expire > 5s:pass && make next-trigger-time
|
106
|
|
- logger.warn(">>>>>>>>>>> xxl-job, schedule misfire, jobId = " + jobInfo.getId());
|
|
111
|
+ preparedStatement = conn.prepareStatement("select * from xxl_job_lock where lock_name = 'schedule_lock' for update");
|
|
112
|
+ preparedStatement.execute();
|
107
|
113
|
|
108
|
|
- // fresh next
|
109
|
|
- refreshNextValidTime(jobInfo, new Date());
|
|
114
|
+ // tx start
|
110
|
115
|
|
111
|
|
- } else if (nowTime > jobInfo.getTriggerNextTime()) {
|
112
|
|
- // 2.2、trigger-expire < 5s:direct-trigger && make next-trigger-time
|
|
116
|
+ // 1、pre read
|
|
117
|
+ long nowTime = System.currentTimeMillis();
|
|
118
|
+ List<XxlJobInfo> scheduleList = xxlJobInfoDao.scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);
|
|
119
|
+ if (scheduleList != null && scheduleList.size() > 0) {
|
|
120
|
+ // 2、push time-ring
|
|
121
|
+ for (XxlJobInfo jobInfo : scheduleList) {
|
113
|
122
|
|
114
|
|
- // 1、trigger
|
115
|
|
- jobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null);
|
116
|
|
- logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId());
|
|
123
|
+ // time-ring jump
|
|
124
|
+ if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {
|
|
125
|
+ // 2.1、trigger-expire > 5s:pass && make next-trigger-time
|
|
126
|
+ logger.warn(">>>>>>>>>>> xxl-job, schedule misfire, jobId = " + jobInfo.getId());
|
117
|
127
|
|
118
|
|
- // 2、fresh next
|
119
|
|
- refreshNextValidTime(jobInfo, new Date());
|
|
128
|
+ // fresh next
|
|
129
|
+ refreshNextValidTime(jobInfo, new Date());
|
120
|
130
|
|
121
|
|
- // next-trigger-time in 5s, pre-read again
|
122
|
|
- if (jobInfo.getTriggerStatus() == 1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) {
|
|
131
|
+ } else if (nowTime > jobInfo.getTriggerNextTime()) {
|
|
132
|
+ // 2.2、trigger-expire < 5s:direct-trigger && make next-trigger-time
|
123
|
133
|
|
124
|
|
- // 1、make ring second
|
125
|
|
- int ringSecond = (int) ((jobInfo.getTriggerNextTime() / 1000) % 60);
|
|
134
|
+ // 1、trigger
|
|
135
|
+ jobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null);
|
|
136
|
+ logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId());
|
126
|
137
|
|
127
|
|
- // 2、push time ring
|
128
|
|
- pushTimeRing(ringSecond, jobInfo.getId());
|
|
138
|
+ // 2、fresh next
|
|
139
|
+ refreshNextValidTime(jobInfo, new Date());
|
129
|
140
|
|
130
|
|
- // 3、fresh next
|
131
|
|
- refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));
|
|
141
|
+ // next-trigger-time in 5s, pre-read again
|
|
142
|
+ if (jobInfo.getTriggerStatus() == 1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) {
|
132
|
143
|
|
133
|
|
- }
|
|
144
|
+ // 1、make ring second
|
|
145
|
+ int ringSecond = (int) ((jobInfo.getTriggerNextTime() / 1000) % 60);
|
134
|
146
|
|
135
|
|
- } else {
|
136
|
|
- // 2.3、trigger-pre-read:time-ring trigger && make next-trigger-time
|
|
147
|
+ // 2、push time ring
|
|
148
|
+ pushTimeRing(ringSecond, jobInfo.getId());
|
137
|
149
|
|
138
|
|
- // 1、make ring second
|
139
|
|
- int ringSecond = (int) ((jobInfo.getTriggerNextTime() / 1000) % 60);
|
|
150
|
+ // 3、fresh next
|
|
151
|
+ refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));
|
140
|
152
|
|
141
|
|
- // 2、push time ring
|
142
|
|
- pushTimeRing(ringSecond, jobInfo.getId());
|
|
153
|
+ }
|
143
|
154
|
|
144
|
|
- // 3、fresh next
|
145
|
|
- refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));
|
|
155
|
+ } else {
|
|
156
|
+ // 2.3、trigger-pre-read:time-ring trigger && make next-trigger-time
|
146
|
157
|
|
147
|
|
- }
|
|
158
|
+ // 1、make ring second
|
|
159
|
+ int ringSecond = (int) ((jobInfo.getTriggerNextTime() / 1000) % 60);
|
148
|
160
|
|
149
|
|
- }
|
|
161
|
+ // 2、push time ring
|
|
162
|
+ pushTimeRing(ringSecond, jobInfo.getId());
|
150
|
163
|
|
151
|
|
- // 3、update trigger info
|
152
|
|
- for (XxlJobInfo jobInfo : scheduleList) {
|
153
|
|
- xxlJobInfoDao.scheduleUpdate(jobInfo);
|
154
|
|
- }
|
|
164
|
+ // 3、fresh next
|
|
165
|
+ refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));
|
155
|
166
|
|
156
|
|
- } else {
|
157
|
|
- preReadSuc = false;
|
158
|
167
|
}
|
159
|
168
|
|
160
|
|
- // tx stop
|
|
169
|
+ }
|
|
170
|
+
|
|
171
|
+ // 3、update trigger info
|
|
172
|
+ for (XxlJobInfo jobInfo : scheduleList) {
|
|
173
|
+ xxlJobInfoDao.scheduleUpdate(jobInfo);
|
|
174
|
+ }
|
|
175
|
+
|
|
176
|
+ } else {
|
|
177
|
+ preReadSuc = false;
|
|
178
|
+ }
|
161
|
179
|
|
|
180
|
+ // tx stop
|
|
181
|
+
|
|
182
|
+
|
|
183
|
+ } catch (Exception e) {
|
|
184
|
+ if (!scheduleThreadToStop) {
|
|
185
|
+ logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread error:{}", e.getMessage());
|
|
186
|
+ }
|
|
187
|
+ } finally {
|
162
|
188
|
|
163
|
|
- } catch (Exception e) {
|
|
189
|
+ // commit
|
|
190
|
+ if (conn != null) {
|
|
191
|
+ try {
|
|
192
|
+ conn.commit();
|
|
193
|
+ } catch (SQLException e) {
|
164
|
194
|
if (!scheduleThreadToStop) {
|
165
|
|
- logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread error:{}", e);
|
|
195
|
+ logger.error(e.getMessage(), e);
|
166
|
196
|
}
|
167
|
|
- } finally {
|
168
|
|
-
|
169
|
|
- // commit
|
170
|
|
- if (conn != null) {
|
171
|
|
- try {
|
172
|
|
- conn.commit();
|
173
|
|
- } catch (SQLException e) {
|
174
|
|
- if (!scheduleThreadToStop) {
|
175
|
|
- logger.error(e.getMessage(), e);
|
176
|
|
- }
|
177
|
|
- }
|
178
|
|
- try {
|
179
|
|
- conn.setAutoCommit(connAutoCommit);
|
180
|
|
- } catch (SQLException e) {
|
181
|
|
- if (!scheduleThreadToStop) {
|
182
|
|
- logger.error(e.getMessage(), e);
|
183
|
|
- }
|
184
|
|
- }
|
185
|
|
- try {
|
186
|
|
- conn.close();
|
187
|
|
- } catch (SQLException e) {
|
188
|
|
- if (!scheduleThreadToStop) {
|
189
|
|
- logger.error(e.getMessage(), e);
|
190
|
|
- }
|
191
|
|
- }
|
|
197
|
+ }
|
|
198
|
+ try {
|
|
199
|
+ conn.setAutoCommit(connAutoCommit);
|
|
200
|
+ } catch (SQLException e) {
|
|
201
|
+ if (!scheduleThreadToStop) {
|
|
202
|
+ logger.error(e.getMessage(), e);
|
192
|
203
|
}
|
193
|
|
-
|
194
|
|
- // close PreparedStatement
|
195
|
|
- if (null != preparedStatement) {
|
196
|
|
- try {
|
197
|
|
- preparedStatement.close();
|
198
|
|
- } catch (SQLException e) {
|
199
|
|
- if (!scheduleThreadToStop) {
|
200
|
|
- logger.error(e.getMessage(), e);
|
201
|
|
- }
|
202
|
|
- }
|
|
204
|
+ }
|
|
205
|
+ try {
|
|
206
|
+ conn.close();
|
|
207
|
+ } catch (SQLException e) {
|
|
208
|
+ if (!scheduleThreadToStop) {
|
|
209
|
+ logger.error(e.getMessage(), e);
|
203
|
210
|
}
|
204
|
211
|
}
|
205
|
|
- long cost = System.currentTimeMillis() - start;
|
206
|
|
-
|
|
212
|
+ }
|
207
|
213
|
|
208
|
|
- // Wait seconds, align second
|
209
|
|
- if (cost < 1000) { // scan-overtime, not wait
|
210
|
|
- try {
|
211
|
|
- // pre-read period: success > scan each second; fail > skip this period;
|
212
|
|
- TimeUnit.MILLISECONDS.sleep((preReadSuc ? 1000 : PRE_READ_MS) - System.currentTimeMillis() % 1000);
|
213
|
|
- } catch (InterruptedException e) {
|
214
|
|
- if (!scheduleThreadToStop) {
|
215
|
|
- logger.error(e.getMessage(), e);
|
216
|
|
- }
|
|
214
|
+ // close PreparedStatement
|
|
215
|
+ if (null != preparedStatement) {
|
|
216
|
+ try {
|
|
217
|
+ preparedStatement.close();
|
|
218
|
+ } catch (SQLException e) {
|
|
219
|
+ if (!scheduleThreadToStop) {
|
|
220
|
+ logger.error(e.getMessage(), e);
|
217
|
221
|
}
|
218
|
222
|
}
|
219
|
|
-
|
220
|
223
|
}
|
221
|
|
-
|
222
|
|
- logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread stop");
|
223
|
224
|
}
|
224
|
|
- });
|
225
|
|
- scheduleThread.setDaemon(true);
|
226
|
|
- scheduleThread.setName("xxl-job, admin JobScheduleHelper#scheduleThread");
|
227
|
|
- scheduleThread.start();
|
|
225
|
+ long cost = System.currentTimeMillis() - start;
|
228
|
226
|
|
229
|
227
|
|
230
|
|
- // ring thread
|
231
|
|
- ringThread = new Thread(new Runnable() {
|
232
|
|
- @Override
|
233
|
|
- public void run() {
|
234
|
|
-
|
235
|
|
- // align second
|
|
228
|
+ // Wait seconds, align second
|
|
229
|
+ if (cost < 1000) { // scan-overtime, not wait
|
236
|
230
|
try {
|
237
|
|
- TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis() % 1000);
|
|
231
|
+ // pre-read period: success > scan each second; fail > skip this period;
|
|
232
|
+ TimeUnit.MILLISECONDS.sleep((preReadSuc ? 1000 : PRE_READ_MS) - System.currentTimeMillis() % 1000);
|
238
|
233
|
} catch (InterruptedException e) {
|
239
|
|
- if (!ringThreadToStop) {
|
|
234
|
+ if (!scheduleThreadToStop) {
|
240
|
235
|
logger.error(e.getMessage(), e);
|
241
|
236
|
}
|
242
|
237
|
}
|
|
238
|
+ }
|
243
|
239
|
|
244
|
|
- while (!ringThreadToStop) {
|
|
240
|
+ }
|
245
|
241
|
|
246
|
|
- try {
|
247
|
|
- // second data
|
248
|
|
- List<Integer> ringItemData = new ArrayList<>();
|
249
|
|
- int nowSecond = Calendar.getInstance().get(Calendar.SECOND); // 避免处理耗时太长,跨过刻度,向前校验一个刻度;
|
250
|
|
- for (int i = 0; i < 2; i++) {
|
251
|
|
- List<Integer> tmpData = ringData.remove((nowSecond + 60 - i) % 60);
|
252
|
|
- if (tmpData != null) {
|
253
|
|
- ringItemData.addAll(tmpData);
|
254
|
|
- }
|
255
|
|
- }
|
|
242
|
+ logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread stop");
|
|
243
|
+ }
|
256
|
244
|
|
257
|
|
- // ring trigger
|
258
|
|
- logger.debug(">>>>>>>>>>> xxl-job, time-ring beat : " + nowSecond + " = " + Arrays.asList(ringItemData));
|
259
|
|
- if (ringItemData.size() > 0) {
|
260
|
|
- // do trigger
|
261
|
|
- for (int jobId : ringItemData) {
|
262
|
|
- // do trigger
|
263
|
|
- jobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null);
|
264
|
|
- }
|
265
|
|
- // clear
|
266
|
|
- ringItemData.clear();
|
267
|
|
- }
|
268
|
|
- } catch (Exception e) {
|
269
|
|
- if (!ringThreadToStop) {
|
270
|
|
- logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}", e);
|
271
|
|
- }
|
|
245
|
+ private void jobScheduleRing() {
|
|
246
|
+
|
|
247
|
+ // align second
|
|
248
|
+ try {
|
|
249
|
+ TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis() % 1000);
|
|
250
|
+ } catch (InterruptedException e) {
|
|
251
|
+ if (!ringThreadToStop) {
|
|
252
|
+ logger.error(e.getMessage(), e);
|
|
253
|
+ }
|
|
254
|
+ }
|
|
255
|
+
|
|
256
|
+ while (!ringThreadToStop) {
|
|
257
|
+
|
|
258
|
+ try {
|
|
259
|
+ // second data
|
|
260
|
+ List<Integer> ringItemData = new ArrayList<>();
|
|
261
|
+ int nowSecond = Calendar.getInstance().get(Calendar.SECOND); // 避免处理耗时太长,跨过刻度,向前校验一个刻度;
|
|
262
|
+ for (int i = 0; i < 2; i++) {
|
|
263
|
+ List<Integer> tmpData = ringData.remove((nowSecond + 60 - i) % 60);
|
|
264
|
+ if (tmpData != null) {
|
|
265
|
+ ringItemData.addAll(tmpData);
|
272
|
266
|
}
|
|
267
|
+ }
|
273
|
268
|
|
274
|
|
- // next second, align second
|
275
|
|
- try {
|
276
|
|
- TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis() % 1000);
|
277
|
|
- } catch (InterruptedException e) {
|
278
|
|
- if (!ringThreadToStop) {
|
279
|
|
- logger.error(e.getMessage(), e);
|
280
|
|
- }
|
|
269
|
+ // ring trigger
|
|
270
|
+ logger.debug(">>>>>>>>>>> xxl-job, time-ring beat : " + nowSecond + " = " + Arrays.asList(ringItemData));
|
|
271
|
+ if (ringItemData.size() > 0) {
|
|
272
|
+ // do trigger
|
|
273
|
+ for (int jobId : ringItemData) {
|
|
274
|
+ // do trigger
|
|
275
|
+ jobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null);
|
281
|
276
|
}
|
|
277
|
+ // clear
|
|
278
|
+ ringItemData.clear();
|
|
279
|
+ }
|
|
280
|
+ } catch (Exception e) {
|
|
281
|
+ if (!ringThreadToStop) {
|
|
282
|
+ logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}", e.getMessage());
|
282
|
283
|
}
|
283
|
|
- logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread stop");
|
284
|
284
|
}
|
285
|
|
- });
|
286
|
|
- ringThread.setDaemon(true);
|
287
|
|
- ringThread.setName("xxl-job, admin JobScheduleHelper#ringThread");
|
288
|
|
- ringThread.start();
|
|
285
|
+
|
|
286
|
+ // next second, align second
|
|
287
|
+ try {
|
|
288
|
+ TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis() % 1000);
|
|
289
|
+ } catch (InterruptedException e) {
|
|
290
|
+ if (!ringThreadToStop) {
|
|
291
|
+ logger.error(e.getMessage(), e);
|
|
292
|
+ }
|
|
293
|
+ }
|
|
294
|
+ }
|
|
295
|
+ logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread stop");
|
289
|
296
|
}
|
290
|
297
|
|
291
|
298
|
private void refreshNextValidTime(XxlJobInfo jobInfo, Date fromTime) throws ParseException {
|
|
@@ -302,14 +309,10 @@ public class JobScheduleHelper {
|
302
|
309
|
|
303
|
310
|
private void pushTimeRing(int ringSecond, int jobId) {
|
304
|
311
|
// push async ring
|
305
|
|
- List<Integer> ringItemData = ringData.get(ringSecond);
|
306
|
|
- if (ringItemData == null) {
|
307
|
|
- ringItemData = new ArrayList<Integer>();
|
308
|
|
- ringData.put(ringSecond, ringItemData);
|
309
|
|
- }
|
|
312
|
+ List<Integer> ringItemData = ringData.computeIfAbsent(ringSecond, k -> new ArrayList<Integer>());
|
310
|
313
|
ringItemData.add(jobId);
|
311
|
314
|
|
312
|
|
- logger.debug(">>>>>>>>>>> xxl-job, schedule push time-ring : " + ringSecond + " = " + Arrays.asList(ringItemData));
|
|
315
|
+ logger.debug(">>>>>>>>>>> xxl-job, schedule push time-ring : {} = {}", ringSecond, ringItemData);
|
313
|
316
|
}
|
314
|
317
|
|
315
|
318
|
@PreDestroy
|