浏览代码

底层线程模型统一;destory逻辑优化;

xueli.xue 8 年前
父节点
当前提交
79f9317ffe

+ 2 - 2
xxl-job-admin/src/main/java/com/xxl/job/admin/core/schedule/XxlJobDynamicScheduler.java 查看文件

@@ -78,10 +78,10 @@ public final class XxlJobDynamicScheduler implements ApplicationContextAware, In
78 78
     // destroy
79 79
     public void destroy(){
80 80
         // admin registry stop
81
-        JobRegistryHelper.getInstance().stop();
81
+        JobRegistryHelper.getInstance().toStop();
82 82
 
83 83
         // admin monitor stop
84
-        JobMonitorHelper.getInstance().stop();
84
+        JobMonitorHelper.getInstance().toStop();
85 85
 
86 86
         serverFactory.destroy();
87 87
     }

+ 2 - 2
xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobMonitorHelper.java 查看文件

@@ -36,7 +36,7 @@ public class JobMonitorHelper {
36 36
 
37 37
 			@Override
38 38
 			public void run() {
39
-				while (true) {
39
+				while (!toStop) {
40 40
 					try {
41 41
 						logger.debug(">>>>>>>>>>> job monitor beat ... ");
42 42
 						Integer jobLogId = JobMonitorHelper.instance.queue.take();
@@ -81,7 +81,7 @@ public class JobMonitorHelper {
81 81
 		monitorThread.start();
82 82
 	}
83 83
 
84
-	public void stop(){
84
+	public void toStop(){
85 85
 		toStop = true;
86 86
 		//monitorThread.interrupt();
87 87
 	}

+ 1 - 1
xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobRegistryHelper.java 查看文件

@@ -70,7 +70,7 @@ public class JobRegistryHelper {
70 70
 		registryThread.start();
71 71
 	}
72 72
 
73
-	public void stop(){
73
+	public void toStop(){
74 74
 		toStop = true;
75 75
 		//registryThread.interrupt();
76 76
 	}

+ 26 - 1
xxl-job-core/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java 查看文件

@@ -6,7 +6,9 @@ import com.xxl.job.core.handler.IJobHandler;
6 6
 import com.xxl.job.core.handler.annotation.JobHander;
7 7
 import com.xxl.job.core.registry.RegistHelper;
8 8
 import com.xxl.job.core.rpc.netcom.NetComServerFactory;
9
+import com.xxl.job.core.thread.ExecutorRegistryThread;
9 10
 import com.xxl.job.core.thread.JobThread;
11
+import com.xxl.job.core.thread.TriggerCallbackThread;
10 12
 import org.slf4j.Logger;
11 13
 import org.slf4j.LoggerFactory;
12 14
 import org.springframework.beans.BeansException;
@@ -46,11 +48,33 @@ public class XxlJobExecutor implements ApplicationContextAware, ApplicationListe
46 48
     // ---------------------------------- job server ------------------------------------
47 49
     private NetComServerFactory serverFactory = new NetComServerFactory();
48 50
     public void start() throws Exception {
51
+        // executor start
49 52
         NetComServerFactory.putService(ExecutorBiz.class, new ExecutorBizImpl());
50 53
         serverFactory.start(port, ip, appName, registHelper);
54
+
55
+        // trigger callback thread start
56
+        TriggerCallbackThread.getInstance().start();
51 57
     }
52 58
     public void destroy(){
59
+        // executor stop
53 60
         serverFactory.destroy();
61
+
62
+        // job thread repository destory
63
+        if (JobThreadRepository.size() > 0) {
64
+            for (Map.Entry<String, JobThread> item: JobThreadRepository.entrySet()) {
65
+                JobThread jobThread = item.getValue();
66
+                jobThread.toStop("Web容器销毁终止");
67
+                jobThread.interrupt();
68
+
69
+            }
70
+            JobThreadRepository.clear();
71
+        }
72
+
73
+        // trigger callback thread stop
74
+        TriggerCallbackThread.getInstance().toStop();
75
+
76
+        // executor registry thread stop
77
+        ExecutorRegistryThread.getInstance().toStop();
54 78
     }
55 79
 
56 80
     // ---------------------------------- init job handler ------------------------------------
@@ -99,7 +123,8 @@ public class XxlJobExecutor implements ApplicationContextAware, ApplicationListe
99 123
         return jobThread;
100 124
     }
101 125
     public static JobThread loadJobThread(String jobKey){
102
-        return JobThreadRepository.get(jobKey);
126
+        JobThread jobThread = JobThreadRepository.get(jobKey);
127
+        return jobThread;
103 128
     }
104 129
     public static void removeJobThread(String jobKey){
105 130
         JobThreadRepository.remove(jobKey);

+ 7 - 42
xxl-job-core/src/main/java/com/xxl/job/core/rpc/netcom/jetty/server/JettyServer.java 查看文件

@@ -1,7 +1,7 @@
1 1
 package com.xxl.job.core.rpc.netcom.jetty.server;
2 2
 
3 3
 import com.xxl.job.core.registry.RegistHelper;
4
-import com.xxl.job.core.util.IpUtil;
4
+import com.xxl.job.core.thread.ExecutorRegistryThread;
5 5
 import org.eclipse.jetty.server.Connector;
6 6
 import org.eclipse.jetty.server.Handler;
7 7
 import org.eclipse.jetty.server.Server;
@@ -11,8 +11,6 @@ import org.eclipse.jetty.util.thread.ExecutorThreadPool;
11 11
 import org.slf4j.Logger;
12 12
 import org.slf4j.LoggerFactory;
13 13
 
14
-import java.util.concurrent.TimeUnit;
15
-
16 14
 /**
17 15
  * rpc jetty server
18 16
  * @author xuxueli 2015-11-19 22:29:03
@@ -21,9 +19,9 @@ public class JettyServer {
21 19
 	private static final Logger logger = LoggerFactory.getLogger(JettyServer.class);
22 20
 
23 21
 	private Server server;
24
-
22
+	private Thread thread;
25 23
 	public void start(final int port, final String ip, final String appName, final RegistHelper registHelper) throws Exception {
26
-		Thread thread = new Thread(new Runnable() {
24
+		thread = new Thread(new Runnable() {
27 25
 			@Override
28 26
 			public void run() {
29 27
 				server = new Server();
@@ -43,7 +41,7 @@ public class JettyServer {
43 41
 				try {
44 42
 					server.start();
45 43
 					logger.info(">>>>>>>>>>>> xxl-job jetty server start success at port:{}.", port);
46
-					executorRegistryBeat(port, ip, appName, registHelper);
44
+					ExecutorRegistryThread.getInstance().start(port, ip, appName, registHelper);
47 45
 					server.join();	// block until thread stopped
48 46
 					logger.info(">>>>>>>>>>> xxl-rpc server start success, netcon={}, port={}", JettyServer.class.getName(), port);
49 47
 				} catch (Exception e) {
@@ -65,43 +63,10 @@ public class JettyServer {
65 63
 				logger.error("", e);
66 64
 			}
67 65
 		}
68
-		logger.info(">>>>>>>>>>> xxl-rpc server destroy success, netcon={}", JettyServer.class.getName());
69
-	}
70
-
71
-	/**
72
-	 * registry beat
73
-	 * @param port
74
-	 * @param ip
75
-	 * @param appName
76
-	 * @param registHelper
77
-	 */
78
-	private void executorRegistryBeat(final int port, final String ip, final String appName, final RegistHelper registHelper){
79
-		if (registHelper==null && appName==null || appName.trim().length()==0) {
80
-			return;
66
+		if (thread.isAlive()) {
67
+			thread.interrupt();
81 68
 		}
82
-		Thread registryThread = new Thread(new Runnable() {
83
-			@Override
84
-			public void run() {
85
-				while (true) {
86
-					try {
87
-						// generate addredd = ip:port
88
-						String address = null;
89
-						if (ip != null && ip.trim().length()>0) {
90
-							address = ip.trim().concat(":").concat(String.valueOf(port));
91
-						} else {
92
-							address = IpUtil.getIpPort(port);
93
-						}
94
-
95
-						registHelper.registry(RegistHelper.RegistType.EXECUTOR.name(), appName, address);
96
-						TimeUnit.SECONDS.sleep(RegistHelper.TIMEOUT);
97
-					} catch (Exception e) {
98
-						e.printStackTrace();
99
-					}
100
-				}
101
-			}
102
-		});
103
-		registryThread.setDaemon(true);
104
-		registryThread.start();
69
+		logger.info(">>>>>>>>>>> xxl-rpc server destroy success, netcon={}", JettyServer.class.getName());
105 70
 	}
106 71
 
107 72
 }

+ 45 - 0
xxl-job-core/src/main/java/com/xxl/job/core/thread/ExecutorRegistryThread.java 查看文件

@@ -1,8 +1,53 @@
1 1
 package com.xxl.job.core.thread;
2 2
 
3
+import com.xxl.job.core.registry.RegistHelper;
4
+import com.xxl.job.core.util.IpUtil;
5
+
6
+import java.util.concurrent.TimeUnit;
7
+
3 8
 /**
4 9
  * Created by xuxueli on 17/3/2.
5 10
  */
6 11
 public class ExecutorRegistryThread extends Thread {
7 12
 
13
+    private static ExecutorRegistryThread instance = new ExecutorRegistryThread();
14
+    public static ExecutorRegistryThread getInstance(){
15
+        return instance;
16
+    }
17
+
18
+    private Thread registryThread;
19
+    private boolean toStop = false;
20
+    public void start(final int port, final String ip, final String appName, final RegistHelper registHelper){
21
+        if (registHelper==null && appName==null || appName.trim().length()==0) {
22
+            return;
23
+        }
24
+        registryThread = new Thread(new Runnable() {
25
+            @Override
26
+            public void run() {
27
+                while (!toStop) {
28
+                    try {
29
+                        // generate addredd = ip:port
30
+                        String address = null;
31
+                        if (ip != null && ip.trim().length()>0) {
32
+                            address = ip.trim().concat(":").concat(String.valueOf(port));
33
+                        } else {
34
+                            address = IpUtil.getIpPort(port);
35
+                        }
36
+
37
+                        registHelper.registry(RegistHelper.RegistType.EXECUTOR.name(), appName, address);
38
+                        TimeUnit.SECONDS.sleep(RegistHelper.TIMEOUT);
39
+                    } catch (Exception e) {
40
+                        e.printStackTrace();
41
+                    }
42
+                }
43
+            }
44
+        });
45
+        registryThread.setDaemon(true);
46
+        registryThread.start();
47
+    }
48
+
49
+    public void toStop() {
50
+        toStop = true;
51
+    }
52
+
8 53
 }

+ 22 - 7
xxl-job-core/src/main/java/com/xxl/job/core/thread/TriggerCallbackThread.java 查看文件

@@ -15,14 +15,23 @@ import java.util.concurrent.LinkedBlockingQueue;
15 15
 public class TriggerCallbackThread {
16 16
     private static Logger logger = LoggerFactory.getLogger(TriggerCallbackThread.class);
17 17
 
18
-    private static LinkedBlockingQueue<HandleCallbackParam> callBackQueue = new LinkedBlockingQueue<HandleCallbackParam>();
19
-    static {
20
-        new Thread(new Runnable() {
18
+    private static TriggerCallbackThread instance = new TriggerCallbackThread();
19
+    public static TriggerCallbackThread getInstance(){
20
+        return instance;
21
+    }
22
+
23
+    private LinkedBlockingQueue<HandleCallbackParam> callBackQueue = new LinkedBlockingQueue<HandleCallbackParam>();
24
+
25
+    private Thread triggerCallbackThread;
26
+    private boolean toStop = false;
27
+    public void start() {
28
+        triggerCallbackThread = new Thread(new Runnable() {
29
+
21 30
             @Override
22 31
             public void run() {
23
-                while(true){
32
+                while(!toStop){
24 33
                     try {
25
-                        HandleCallbackParam callback = callBackQueue.take();
34
+                        HandleCallbackParam callback = getInstance().callBackQueue.take();
26 35
                         if (callback != null) {
27 36
                             for (String address : callback.getLogAddress()) {
28 37
                                 try {
@@ -44,10 +53,16 @@ public class TriggerCallbackThread {
44 53
                     }
45 54
                 }
46 55
             }
47
-        }).start();
56
+        });
57
+        triggerCallbackThread.setDaemon(true);
58
+        triggerCallbackThread.start();
59
+    }
60
+    public void toStop(){
61
+        toStop = true;
48 62
     }
63
+
49 64
     public static void pushCallBack(HandleCallbackParam callback){
50
-        callBackQueue.add(callback);
65
+        getInstance().callBackQueue.add(callback);
51 66
         logger.debug(">>>>>>>>>>> xxl-job, push callback request, logId:{}", callback.getLogId());
52 67
     }
53 68