Browse Source

执行器注册线程优化,线程销毁时主动摘除注册机器;

xuxueli 7 years ago
parent
commit
837ab404b1

+ 5 - 3
doc/XXL-JOB官方文档.md View File

739
 自v1.5版本之后, 任务取消了"任务执行机器"属性, 改为通过任务注册和自动发现的方式, 动态获取远程执行器地址并执行。
739
 自v1.5版本之后, 任务取消了"任务执行机器"属性, 改为通过任务注册和自动发现的方式, 动态获取远程执行器地址并执行。
740
 
740
 
741
     AppName: 每个执行器机器集群的唯一标示, 任务注册以 "执行器" 为最小粒度进行注册; 每个任务通过其绑定的执行器可感知对应的执行器机器列表;
741
     AppName: 每个执行器机器集群的唯一标示, 任务注册以 "执行器" 为最小粒度进行注册; 每个任务通过其绑定的执行器可感知对应的执行器机器列表;
742
-    Beat: 任务注册心跳周期, 默认15s; 执行器以一倍Beat进行执行器注册, 调度中心以一倍Beat进行动态任务发现; 注册信息的失效时间被两倍Beat; 
743
     注册表: 见"XXL_JOB_QRTZ_TRIGGER_REGISTRY"表, "执行器" 在进行任务注册时将会周期性维护一条注册记录,即机器地址和AppName的绑定关系; "调度中心" 从而可以动态感知每个AppName在线的机器列表;
742
     注册表: 见"XXL_JOB_QRTZ_TRIGGER_REGISTRY"表, "执行器" 在进行任务注册时将会周期性维护一条注册记录,即机器地址和AppName的绑定关系; "调度中心" 从而可以动态感知每个AppName在线的机器列表;
743
+    执行器注册: 任务注册Beat周期默认30s; 执行器以一倍Beat进行执行器注册, 调度中心以一倍Beat进行动态任务发现; 注册信息的失效时间被三倍Beat; 
744
+    执行器注册摘除:执行器销毁时,将会主动上报调度中心并摘除对应的执行器机器信息,提高心跳注册的实时性;
745
+    
744
 
746
 
745
 为保证系统"轻量级"并且降低学习部署成本,没有采用Zookeeper作为注册中心,采用DB方式进行任务注册发现;
747
 为保证系统"轻量级"并且降低学习部署成本,没有采用Zookeeper作为注册中心,采用DB方式进行任务注册发现;
746
 
748
 
986
 - 3、新增JFinal类型执行器sample示例项目;
988
 - 3、新增JFinal类型执行器sample示例项目;
987
 - 4、执行器手动设置IP时将会绑定Host;
989
 - 4、执行器手动设置IP时将会绑定Host;
988
 - 5、项目主页搭建,提供中英文文档;
990
 - 5、项目主页搭建,提供中英文文档;
989
-- 6、执行器回调线程优化,线程销毁前批量回调队列中所有数据;
990
-- 7、执行器注册线程优化,线程销毁时主动摘除注册机器;
991
+- 6、执行器回调线程优化,线程销毁前批量回调队列中数据,防止任务结果丢失
992
+- 7、执行器注册线程优化,线程销毁时主动摘除注册机器信息,提高执行器注册的实时性
991
 
993
 
992
 ### TODO LIST
994
 ### TODO LIST
993
 - 1、任务权限管理:执行器为粒度分配权限,核心操作校验权限;
995
 - 1、任务权限管理:执行器为粒度分配权限,核心操作校验权限;

+ 1 - 1
xxl-job-admin/src/main/java/com/xxl/job/admin/service/impl/AdminBizImpl.java View File

73
                                 childTriggerMsg += MessageFormat.format("<br> {0}/{1} 触发子任务成功, 子任务Key: {2}, status: {3}, 子任务描述: {4}",
73
                                 childTriggerMsg += MessageFormat.format("<br> {0}/{1} 触发子任务成功, 子任务Key: {2}, status: {3}, 子任务描述: {4}",
74
                                         (i+1), childJobKeys.length, childJobKeys[i], ret, childJobInfo.getJobDesc());
74
                                         (i+1), childJobKeys.length, childJobKeys[i], ret, childJobInfo.getJobDesc());
75
                             } catch (SchedulerException e) {
75
                             } catch (SchedulerException e) {
76
-                                logger.error("", e);
76
+                                logger.error(e.getMessage(), e);
77
                             }
77
                             }
78
                         } else {
78
                         } else {
79
                             childTriggerMsg += MessageFormat.format("<br> {0}/{1} 触发子任务失败, 子任务xxlJobInfo不存在, 子任务Key: {2}",
79
                             childTriggerMsg += MessageFormat.format("<br> {0}/{1} 触发子任务失败, 子任务xxlJobInfo不存在, 子任务Key: {2}",

+ 3 - 3
xxl-job-admin/src/main/java/com/xxl/job/admin/service/impl/XxlJobServiceImpl.java View File

132
             //XxlJobDynamicScheduler.pauseJob(qz_name, qz_group);
132
             //XxlJobDynamicScheduler.pauseJob(qz_name, qz_group);
133
             return ReturnT.SUCCESS;
133
             return ReturnT.SUCCESS;
134
         } catch (SchedulerException e) {
134
         } catch (SchedulerException e) {
135
-            logger.error("", e);
135
+            logger.error(e.getMessage(), e);
136
             try {
136
             try {
137
                 xxlJobInfoDao.delete(jobInfo.getId());
137
                 xxlJobInfoDao.delete(jobInfo.getId());
138
                 XxlJobDynamicScheduler.removeJob(qz_name, qz_group);
138
                 XxlJobDynamicScheduler.removeJob(qz_name, qz_group);
139
             } catch (SchedulerException e1) {
139
             } catch (SchedulerException e1) {
140
-                logger.error("", e1);
140
+                logger.error(e.getMessage(), e1);
141
             }
141
             }
142
             return new ReturnT<String>(ReturnT.FAIL_CODE, "新增任务失败:" + e.getMessage());
142
             return new ReturnT<String>(ReturnT.FAIL_CODE, "新增任务失败:" + e.getMessage());
143
         }
143
         }
207
             boolean ret = XxlJobDynamicScheduler.rescheduleJob(qz_group, qz_name, exists_jobInfo.getJobCron());
207
             boolean ret = XxlJobDynamicScheduler.rescheduleJob(qz_group, qz_name, exists_jobInfo.getJobCron());
208
             return ret?ReturnT.SUCCESS:ReturnT.FAIL;
208
             return ret?ReturnT.SUCCESS:ReturnT.FAIL;
209
         } catch (SchedulerException e) {
209
         } catch (SchedulerException e) {
210
-            logger.error("", e);
210
+            logger.error(e.getMessage(), e);
211
         }
211
         }
212
 
212
 
213
 		return ReturnT.FAIL;
213
 		return ReturnT.FAIL;

+ 1 - 1
xxl-job-core/src/main/java/com/xxl/job/core/biz/impl/ExecutorBizImpl.java View File

115
                     IJobHandler originJobHandler = GlueFactory.getInstance().loadNewInstance(triggerParam.getGlueSource());
115
                     IJobHandler originJobHandler = GlueFactory.getInstance().loadNewInstance(triggerParam.getGlueSource());
116
                     jobHandler = new GlueJobHandler(originJobHandler, triggerParam.getGlueUpdatetime());
116
                     jobHandler = new GlueJobHandler(originJobHandler, triggerParam.getGlueUpdatetime());
117
                 } catch (Exception e) {
117
                 } catch (Exception e) {
118
-                    logger.error("", e);
118
+                    logger.error(e.getMessage(), e);
119
                     return new ReturnT<String>(ReturnT.FAIL_CODE, e.getMessage());
119
                     return new ReturnT<String>(ReturnT.FAIL_CODE, e.getMessage());
120
                 }
120
                 }
121
             }
121
             }

+ 9 - 9
xxl-job-core/src/main/java/com/xxl/job/core/rpc/netcom/jetty/server/JettyServer.java View File

55
 					server.join();	// block until thread stopped
55
 					server.join();	// block until thread stopped
56
 					logger.info(">>>>>>>>>>> xxl-rpc server join success, netcon={}, port={}", JettyServer.class.getName(), port);
56
 					logger.info(">>>>>>>>>>> xxl-rpc server join success, netcon={}, port={}", JettyServer.class.getName(), port);
57
 				} catch (Exception e) {
57
 				} catch (Exception e) {
58
-					logger.error("", e);
58
+					logger.error(e.getMessage(), e);
59
 				} finally {
59
 				} finally {
60
-					destroy();
60
+					//destroy();
61
 				}
61
 				}
62
 			}
62
 			}
63
 		});
63
 		});
67
 
67
 
68
 	public void destroy() {
68
 	public void destroy() {
69
 
69
 
70
+		// destroy Registry-Server
71
+		ExecutorRegistryThread.getInstance().toStop();
72
+
73
+		// destroy Callback-Server
74
+		TriggerCallbackThread.getInstance().toStop();
75
+
70
 		// destroy server
76
 		// destroy server
71
 		if (server != null) {
77
 		if (server != null) {
72
 			try {
78
 			try {
73
 				server.stop();
79
 				server.stop();
74
 				server.destroy();
80
 				server.destroy();
75
 			} catch (Exception e) {
81
 			} catch (Exception e) {
76
-				logger.error("", e);
82
+				logger.error(e.getMessage(), e);
77
 			}
83
 			}
78
 		}
84
 		}
79
 		if (thread.isAlive()) {
85
 		if (thread.isAlive()) {
80
 			thread.interrupt();
86
 			thread.interrupt();
81
 		}
87
 		}
82
 
88
 
83
-		// destroy Registry-Server
84
-		ExecutorRegistryThread.getInstance().toStop();
85
-
86
-		// destroy Callback-Server
87
-		TriggerCallbackThread.getInstance().toStop();
88
-
89
 		logger.info(">>>>>>>>>>> xxl-rpc server destroy success, netcon={}", JettyServer.class.getName());
89
 		logger.info(">>>>>>>>>>> xxl-rpc server destroy success, netcon={}", JettyServer.class.getName());
90
 	}
90
 	}
91
 
91
 

+ 8 - 0
xxl-job-core/src/main/java/com/xxl/job/core/thread/ExecutorRegistryThread.java View File

99
                 } catch (Exception e) {
99
                 } catch (Exception e) {
100
                     logger.error(e.getMessage(), e);
100
                     logger.error(e.getMessage(), e);
101
                 }
101
                 }
102
+                logger.warn(">>>>>>>>>>>> xxl-job, executor registry thread destory.");
102
 
103
 
103
             }
104
             }
104
         });
105
         });
108
 
109
 
109
     public void toStop() {
110
     public void toStop() {
110
         toStop = true;
111
         toStop = true;
112
+        // interrupt and wait
113
+        registryThread.interrupt();
114
+        try {
115
+            registryThread.join();
116
+        } catch (InterruptedException e) {
117
+            logger.error(e.getMessage(), e);
118
+        }
111
     }
119
     }
112
 
120
 
113
 }
121
 }

+ 8 - 0
xxl-job-core/src/main/java/com/xxl/job/core/thread/TriggerCallbackThread.java View File

80
                 } catch (Exception e) {
80
                 } catch (Exception e) {
81
                     logger.error(e.getMessage(), e);
81
                     logger.error(e.getMessage(), e);
82
                 }
82
                 }
83
+                logger.warn(">>>>>>>>>>>> xxl-job, executor callback thread destory.");
83
 
84
 
84
             }
85
             }
85
         });
86
         });
88
     }
89
     }
89
     public void toStop(){
90
     public void toStop(){
90
         toStop = true;
91
         toStop = true;
92
+        // interrupt and wait
93
+        triggerCallbackThread.interrupt();
94
+        try {
95
+            triggerCallbackThread.join();
96
+        } catch (InterruptedException e) {
97
+            logger.error(e.getMessage(), e);
98
+        }
91
     }
99
     }
92
 
100
 
93
     /**
101
     /**

+ 1 - 1
xxl-job-core/src/main/java/com/xxl/job/core/util/HttpClientUtil.java View File

62
 				EntityUtils.consume(entity);
62
 				EntityUtils.consume(entity);
63
 			}
63
 			}
64
 		} catch (Exception e) {
64
 		} catch (Exception e) {
65
-			logger.error("", e);
65
+			logger.error(e.getMessage(), e);
66
 			throw e;
66
 			throw e;
67
 		} finally {
67
 		} finally {
68
 			httpPost.releaseConnection();
68
 			httpPost.releaseConnection();