Browse Source

- Quartz触发线程池废弃并替换为 "XxlJobThreadPool",降低线程切换、内存占用带来的消耗,提高调度性能;

xuxueli 6 years ago
parent
commit
0dc5b28656

+ 1 - 0
README.md View File

@@ -78,6 +78,7 @@ XXL-JOB是一个轻量级分布式任务调度平台,其核心设计目标是
78 78
 - 30、跨平台:原生提供通用HTTP任务Handler(Bean任务,"HttpJobHandler");业务方只需要提供HTTP链接即可,不限制语言、平台;
79 79
 - 31、国际化:调度中心支持国际化设置,提供中文、英文两种可选语言,默认为中文;
80 80
 - 32、容器化:提供官方docker镜像,并实时更新推送dockerhub,进一步实现产品开箱即用;
81
+- 33、线程池隔离:调度线程池进行隔离拆分,慢任务自动降级进入"Slow"线程池,避免耗尽调度线程,提高系统稳定性;;
81 82
 
82 83
 
83 84
 ## Development

+ 4 - 3
doc/XXL-JOB官方文档.md View File

@@ -47,6 +47,7 @@ XXL-JOB是一个轻量级分布式任务调度平台,其核心设计目标是
47 47
 - 30、跨平台:原生提供通用HTTP任务Handler(Bean任务,"HttpJobHandler");业务方只需要提供HTTP链接即可,不限制语言、平台;
48 48
 - 31、国际化:调度中心支持国际化设置,提供中文、英文两种可选语言,默认为中文;
49 49
 - 32、容器化:提供官方docker镜像,并实时更新推送dockerhub,进一步实现产品开箱即用;
50
+- 33、线程池隔离:调度线程池进行隔离拆分,慢任务自动降级进入"Slow"线程池,避免耗尽调度线程,提高系统稳定性;;
50 51
 
51 52
 
52 53
 ### 1.3 发展
@@ -1442,9 +1443,9 @@ Tips: 历史版本(V1.3.x)目前已经Release至稳定版本, 进入维护阶段
1442 1443
 - 19、执行器优雅停机优化;
1443 1444
 - 20、连接池配置优化,增强连接有效性验证;
1444 1445
 - 21、JobHandler#msg长度限制,修复异常情况下日志超长导致内存溢出的问题;
1445
-- 22、[迭代中]任务线程隔离:
1446
-    - 执行器测异步响应,不存在阻塞不需要隔离
1447
-    - 调度中心共用单一调度线程池,可能导致调度阻塞需要线程隔离;调度线程池拆分为Fast/Slow两个,针对调度较慢的执行器地址请求,降级使用Slow线程池;考虑是否可以任务级隔离线程池;
1446
+- 22、Quartz触发线程池废弃并替换为 "XxlJobThreadPool",降低线程切换、内存占用带来的消耗,提高调度性能;
1447
+- 23、调度线程池隔离,拆分为"Fast"和"Slow"两个线程池,1分钟窗口期内任务耗时达500ms超过10次,该窗口期内判定为慢任务,慢任务自动降级进入"Slow"线程池,避免耗尽调度线程,提高系统稳定性
1448
+
1448 1449
 
1449 1450
 ### TODO LIST
1450 1451
 - 1、任务分片路由:分片采用一致性Hash算法计算出尽量稳定的分片顺序,即使注册机器存在波动也不会引起分批分片顺序大的波动;目前采用IP自然排序,可以满足需求,待定;

+ 58 - 0
xxl-job-admin/src/main/java/com/xxl/job/admin/core/quartz/XxlJobThreadPool.java View File

@@ -0,0 +1,58 @@
1
+package com.xxl.job.admin.core.quartz;
2
+
3
+import org.quartz.SchedulerConfigException;
4
+import org.quartz.spi.ThreadPool;
5
+
6
+/**
7
+ * single thread pool, for async trigger
8
+ *
9
+ * @author xuxueli 2019-03-06
10
+ */
11
+public class XxlJobThreadPool implements ThreadPool {
12
+
13
+    @Override
14
+    public boolean runInThread(Runnable runnable) {
15
+
16
+        // async run
17
+        runnable.run();
18
+        return true;
19
+
20
+        //return false;
21
+    }
22
+
23
+    @Override
24
+    public int blockForAvailableThreads() {
25
+        return 1;
26
+    }
27
+
28
+    @Override
29
+    public void initialize() throws SchedulerConfigException {
30
+
31
+    }
32
+
33
+    @Override
34
+    public void shutdown(boolean waitForJobsToComplete) {
35
+
36
+    }
37
+
38
+    @Override
39
+    public int getPoolSize() {
40
+        return 1;
41
+    }
42
+
43
+    @Override
44
+    public void setInstanceId(String schedInstId) {
45
+
46
+    }
47
+
48
+    @Override
49
+    public void setInstanceName(String schedName) {
50
+
51
+    }
52
+
53
+    // support
54
+    public void setThreadCount(int count) {
55
+        //
56
+    }
57
+
58
+}

+ 69 - 11
xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobTriggerPoolHelper.java View File

@@ -5,10 +5,9 @@ import com.xxl.job.admin.core.trigger.XxlJobTrigger;
5 5
 import org.slf4j.Logger;
6 6
 import org.slf4j.LoggerFactory;
7 7
 
8
-import java.util.concurrent.LinkedBlockingQueue;
9
-import java.util.concurrent.ThreadFactory;
10
-import java.util.concurrent.ThreadPoolExecutor;
11
-import java.util.concurrent.TimeUnit;
8
+import java.util.Map;
9
+import java.util.concurrent.*;
10
+import java.util.concurrent.atomic.AtomicInteger;
12 11
 
13 12
 /**
14 13
  * job trigger thread pool helper
@@ -21,32 +20,91 @@ public class JobTriggerPoolHelper {
21 20
 
22 21
     // ---------------------- trigger pool ----------------------
23 22
 
24
-    private ThreadPoolExecutor triggerPool = new ThreadPoolExecutor(
25
-            32,
26
-            256,
23
+    // fast/slow thread pool
24
+    private ThreadPoolExecutor fastTriggerPool = new ThreadPoolExecutor(
25
+            8,
26
+            200,
27 27
             60L,
28 28
             TimeUnit.SECONDS,
29 29
             new LinkedBlockingQueue<Runnable>(1000),
30 30
             new ThreadFactory() {
31 31
                 @Override
32 32
                 public Thread newThread(Runnable r) {
33
-                    return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-triggerPool-" + r.hashCode());
33
+                    return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-fastTriggerPool-" + r.hashCode());
34 34
                 }
35 35
             });
36 36
 
37
+    private ThreadPoolExecutor slowTriggerPool = new ThreadPoolExecutor(
38
+            0,
39
+            100,
40
+            60L,
41
+            TimeUnit.SECONDS,
42
+            new LinkedBlockingQueue<Runnable>(2000),
43
+            new ThreadFactory() {
44
+                @Override
45
+                public Thread newThread(Runnable r) {
46
+                    return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-slowTriggerPool-" + r.hashCode());
47
+                }
48
+            });
49
+
50
+
51
+    // job timeout count
52
+    private volatile long minTim = System.currentTimeMillis()/60000;     // ms > min
53
+    private volatile Map<Integer, AtomicInteger> jobTimeoutCountMap = new ConcurrentHashMap<>();
37 54
 
55
+
56
+    /**
57
+     * add trigger
58
+     */
38 59
     public void addTrigger(final int jobId, final TriggerTypeEnum triggerType, final int failRetryCount, final String executorShardingParam, final String executorParam) {
39
-        triggerPool.execute(new Runnable() {
60
+
61
+        // choose thread pool
62
+        ThreadPoolExecutor triggerPool_ = fastTriggerPool;
63
+        AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);
64
+        if (jobTimeoutCount!=null && jobTimeoutCount.get() > 10) {      // job-timeout 10 times in 1 min
65
+            triggerPool_ = slowTriggerPool;
66
+        }
67
+
68
+        // trigger
69
+        triggerPool_.execute(new Runnable() {
40 70
             @Override
41 71
             public void run() {
42
-                XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam);
72
+
73
+                long start = System.currentTimeMillis();
74
+
75
+                try {
76
+                    // do trigger
77
+                    XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam);
78
+                } catch (Exception e) {
79
+                    logger.error(e.getMessage(), e);
80
+                } finally {
81
+
82
+                    // check timeout-count-map
83
+                    long minTim_now = System.currentTimeMillis()/60000;
84
+                    if (minTim != minTim_now) {
85
+                        minTim = minTim_now;
86
+                        jobTimeoutCountMap.clear();
87
+                    }
88
+
89
+                    // incr timeout-count-map
90
+                    long cost = System.currentTimeMillis()-start;
91
+                    if (cost > 500) {       // ob-timeout threshold 500ms
92
+                        AtomicInteger timeoutCount = jobTimeoutCountMap.put(jobId, new AtomicInteger(1));
93
+                        if (timeoutCount != null) {
94
+                            timeoutCount.incrementAndGet();
95
+                        }
96
+                    }
97
+
98
+                }
99
+
43 100
             }
44 101
         });
45 102
     }
46 103
 
47 104
     public void stop() {
48 105
         //triggerPool.shutdown();
49
-        triggerPool.shutdownNow();
106
+        fastTriggerPool.shutdownNow();
107
+        slowTriggerPool.shutdownNow();
50 108
         logger.info(">>>>>>>>> xxl-job trigger thread pool shutdown success.");
51 109
     }
52 110
 

+ 7 - 4
xxl-job-admin/src/main/resources/quartz.properties View File

@@ -9,16 +9,19 @@ org.quartz.scheduler.rmi.export: false
9 9
 org.quartz.scheduler.rmi.proxy: false
10 10
 org.quartz.scheduler.wrapJobExecutionInUserTransaction: false
11 11
 
12
-org.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPool
13
-org.quartz.threadPool.threadCount: 50
14
-org.quartz.threadPool.threadPriority: 5
15
-org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread: true
12
+#org.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPool
13
+#org.quartz.threadPool.threadCount: 5
14
+#org.quartz.threadPool.threadPriority: 5
15
+#org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread: true
16 16
 
17 17
 org.quartz.jobStore.misfireThreshold: 60000
18 18
 org.quartz.jobStore.maxMisfiresToHandleAtATime: 1
19 19
 
20 20
 #org.quartz.jobStore.class: org.quartz.simpl.RAMJobStore
21 21
 
22
+# for async trigger
23
+org.quartz.threadPool.class: com.xxl.job.admin.core.quartz.XxlJobThreadPool
24
+
22 25
 # for cluster
23 26
 org.quartz.jobStore.tablePrefix: XXL_JOB_QRTZ_
24 27
 org.quartz.jobStore.class: org.quartz.impl.jdbcjobstore.JobStoreTX