Kaynağa Gözat

执行器自动注册逻辑更新

xueli.xue 9 yıl önce
ebeveyn
işleme
295200dbfc

+ 19 - 24
xxl-job-admin/src/main/java/com/xxl/job/admin/core/jobbean/RemoteHttpJobBean.java Dosyayı Görüntüle

3
 import com.xxl.job.admin.core.callback.XxlJobLogCallbackServer;
3
 import com.xxl.job.admin.core.callback.XxlJobLogCallbackServer;
4
 import com.xxl.job.admin.core.model.XxlJobInfo;
4
 import com.xxl.job.admin.core.model.XxlJobInfo;
5
 import com.xxl.job.admin.core.model.XxlJobLog;
5
 import com.xxl.job.admin.core.model.XxlJobLog;
6
-import com.xxl.job.admin.core.model.XxlJobRegistry;
7
 import com.xxl.job.admin.core.thread.JobMonitorHelper;
6
 import com.xxl.job.admin.core.thread.JobMonitorHelper;
7
+import com.xxl.job.admin.core.thread.JobRegistryHelper;
8
 import com.xxl.job.admin.core.util.DynamicSchedulerUtil;
8
 import com.xxl.job.admin.core.util.DynamicSchedulerUtil;
9
 import com.xxl.job.core.registry.RegistHelper;
9
 import com.xxl.job.core.registry.RegistHelper;
10
 import com.xxl.job.core.router.HandlerRouter.ActionRepository;
10
 import com.xxl.job.core.router.HandlerRouter.ActionRepository;
60
 		List<String> addressList = new ArrayList<String>();
60
 		List<String> addressList = new ArrayList<String>();
61
 		String parseAddressMsg = null;
61
 		String parseAddressMsg = null;
62
 		if (StringUtils.isNotBlank(jobInfo.getExecutorAppname())) {
62
 		if (StringUtils.isNotBlank(jobInfo.getExecutorAppname())) {
63
-			List<XxlJobRegistry> xxlJobRegistryList = DynamicSchedulerUtil.xxlJobRegistryDao.findRegistrys(RegistHelper.RegistType.EXECUTOR.name(), jobInfo.getExecutorAppname());
64
-			if (xxlJobRegistryList!=null && xxlJobRegistryList.size()>0) {
65
-				for (XxlJobRegistry item: xxlJobRegistryList) {
66
-					addressList.add(item.getRegistryValue());
67
-				}
68
-			}
69
-			parseAddressMsg = MessageFormat.format("Parse Address (Appname注册方式) <br>>>>[address list] : {0}<br><hr>", addressList.toArray());
63
+			addressList = JobRegistryHelper.discover(RegistHelper.RegistType.EXECUTOR.name(), jobInfo.getExecutorAppname());
64
+			parseAddressMsg = MessageFormat.format("Parse Address (Appname注册方式) <br>>>>[address list] : {0}<br><hr>", addressList);
70
 		} else {
65
 		} else {
71
 			List<String> addressArr = Arrays.asList(jobInfo.getExecutorAddress().split(","));
66
 			List<String> addressArr = Arrays.asList(jobInfo.getExecutorAddress().split(","));
72
 			addressList.addAll(addressArr);
67
 			addressList.addAll(addressArr);
73
-			parseAddressMsg = MessageFormat.format("Parse Address (地址配置方式) <br>>>>[address list] : {0}<br><hr>", addressList.toArray());
68
+			parseAddressMsg = MessageFormat.format("Parse Address (地址配置方式) <br>>>>[address list] : {0}<br><hr>", addressList);
74
 		}
69
 		}
75
 
70
 
76
 		// failover trigger
71
 		// failover trigger
97
 	 * @return
92
 	 * @return
98
 	 */
93
 	 */
99
 	public ResponseModel failoverTrigger(List<String> addressList, RequestModel requestModel, XxlJobLog jobLog){
94
 	public ResponseModel failoverTrigger(List<String> addressList, RequestModel requestModel, XxlJobLog jobLog){
100
-		if (addressList.size() > 1) {
95
+		 if (addressList==null || addressList.size() < 1) {
96
+			ResponseModel result = new ResponseModel();
97
+			result.setStatus(ResponseModel.FAIL);
98
+			result.setMsg( "Trigger error, <br>>>>address list is null <br><hr>" );
99
+			return result;
100
+		} else if (addressList.size() == 1) {
101
+			 String address = addressList.get(0);
102
+			 // store real address
103
+			 jobLog.setExecutorAddress(address);
104
+
105
+			 ResponseModel triggerCallback = XxlJobNetCommUtil.postHex(XxlJobNetCommUtil.addressToUrl(address), requestModel);
106
+			 String failoverMessage = MessageFormat.format("Trigger running, <br>>>>[address] : {0}, <br>>>>[status] : {1}, <br>>>>[msg] : {2} <br><hr>", address, triggerCallback.getStatus(), triggerCallback.getMsg());
107
+			 triggerCallback.setMsg(failoverMessage);
108
+			 return triggerCallback;
109
+		 } else {
101
 			
110
 			
102
 			// for ha
111
 			// for ha
103
 			Collections.shuffle(addressList);
112
 			Collections.shuffle(addressList);
133
 			result.setStatus(ResponseModel.FAIL);
142
 			result.setStatus(ResponseModel.FAIL);
134
 			result.setMsg(failoverMessage);
143
 			result.setMsg(failoverMessage);
135
 			return result;
144
 			return result;
136
-		} else if (addressList.size() == 1) {
137
-			String address = addressList.get(0);
138
-			// store real address
139
-			jobLog.setExecutorAddress(address);
140
-
141
-			ResponseModel triggerCallback = XxlJobNetCommUtil.postHex(XxlJobNetCommUtil.addressToUrl(address), requestModel);
142
-			String failoverMessage = MessageFormat.format("Trigger running, <br>>>>[address] : {0}, <br>>>>[status] : {1}, <br>>>>[msg] : {2} <br><hr>", address, triggerCallback.getStatus(), triggerCallback.getMsg());
143
-			triggerCallback.setMsg(failoverMessage);
144
-			return triggerCallback;
145
-		} else {
146
-			ResponseModel result = new ResponseModel();
147
-			result.setStatus(ResponseModel.FAIL);
148
-			result.setMsg( "Trigger error, <br>>>>address list is null <br><hr>" );
149
-			return result;
150
 		}
145
 		}
151
 	}
146
 	}
152
 
147
 

+ 74 - 0
xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobRegistryHelper.java Dosyayı Görüntüle

1
+package com.xxl.job.admin.core.thread;
2
+
3
+import com.xxl.job.admin.core.model.XxlJobRegistry;
4
+import com.xxl.job.admin.core.util.DynamicSchedulerUtil;
5
+import com.xxl.job.core.registry.RegistHelper;
6
+import org.slf4j.Logger;
7
+import org.slf4j.LoggerFactory;
8
+
9
+import java.util.ArrayList;
10
+import java.util.List;
11
+import java.util.concurrent.ConcurrentHashMap;
12
+import java.util.concurrent.TimeUnit;
13
+
14
+/**
15
+ * job registry helper
16
+ * @author xuxueli 2016-10-02 19:10:24
17
+ */
18
+public class JobRegistryHelper {
19
+	private static Logger logger = LoggerFactory.getLogger(JobRegistryHelper.class);
20
+
21
+	private static JobRegistryHelper helper = new JobRegistryHelper();
22
+	private ConcurrentHashMap<String, List<String>> registMap = new ConcurrentHashMap<String, List<String>>();
23
+
24
+	public JobRegistryHelper(){
25
+		Thread registryThread = new Thread(new Runnable() {
26
+			@Override
27
+			public void run() {
28
+				int timeout = 15;
29
+				while (true) {
30
+					try {
31
+						ConcurrentHashMap<String, List<String>> temp = new ConcurrentHashMap<String, List<String>>();
32
+						// do biz
33
+						DynamicSchedulerUtil.xxlJobRegistryDao.removeDead(RegistHelper.TIMEOUT*2);
34
+						List<XxlJobRegistry> list = DynamicSchedulerUtil.xxlJobRegistryDao.findAll(RegistHelper.TIMEOUT*2);
35
+						if (list != null) {
36
+							for (XxlJobRegistry item: list) {
37
+								String groupKey = makeGroupKey(item.getRegistryGroup(), item.getRegistryKey());
38
+								List<String> dataSet = temp.get(groupKey);
39
+								if (dataSet == null) {
40
+									dataSet = new ArrayList<String>();
41
+								}
42
+								dataSet.add(item.getRegistryValue());
43
+								temp.put(groupKey, dataSet);
44
+							}
45
+						}
46
+						// gresh registry
47
+						registMap = temp;
48
+						logger.error("job registry :{}", list);
49
+					} catch (Exception e) {
50
+						logger.error("job registry helper error:{}", e);
51
+					}
52
+					try {
53
+						TimeUnit.SECONDS.sleep(RegistHelper.TIMEOUT);
54
+					} catch (InterruptedException e) {
55
+						logger.error("job registry helper error:{}", e);
56
+					}
57
+				}
58
+			}
59
+		});
60
+		registryThread.setDaemon(true);
61
+		registryThread.start();
62
+
63
+	}
64
+
65
+	private static String makeGroupKey(String registryGroup, String registryKey){
66
+		return registryGroup.concat("_").concat(registryKey);
67
+	}
68
+	
69
+	public static List<String> discover(String registryGroup, String registryKey){
70
+		String groupKey = makeGroupKey(registryGroup, registryKey);
71
+		return helper.registMap.get(groupKey);
72
+	}
73
+	
74
+}

+ 4 - 0
xxl-job-admin/src/main/java/com/xxl/job/admin/core/util/DynamicSchedulerUtil.java Dosyayı Görüntüle

3
 import com.xxl.job.admin.core.callback.XxlJobLogCallbackServer;
3
 import com.xxl.job.admin.core.callback.XxlJobLogCallbackServer;
4
 import com.xxl.job.admin.core.jobbean.RemoteHttpJobBean;
4
 import com.xxl.job.admin.core.jobbean.RemoteHttpJobBean;
5
 import com.xxl.job.admin.core.model.XxlJobInfo;
5
 import com.xxl.job.admin.core.model.XxlJobInfo;
6
+import com.xxl.job.admin.core.thread.JobRegistryHelper;
6
 import com.xxl.job.admin.dao.IXxlJobInfoDao;
7
 import com.xxl.job.admin.dao.IXxlJobInfoDao;
7
 import com.xxl.job.admin.dao.IXxlJobLogDao;
8
 import com.xxl.job.admin.dao.IXxlJobLogDao;
8
 import com.xxl.job.admin.dao.IXxlJobRegistryDao;
9
 import com.xxl.job.admin.dao.IXxlJobRegistryDao;
49
 		} catch (Exception e) {
50
 		} catch (Exception e) {
50
 			e.printStackTrace();
51
 			e.printStackTrace();
51
 		}
52
 		}
53
+
54
+		// init JobRegistryHelper
55
+        JobRegistryHelper.discover("g", "k");
52
     }
56
     }
53
     
57
     
54
     // destroy
58
     // destroy

+ 3 - 1
xxl-job-admin/src/main/java/com/xxl/job/admin/dao/IXxlJobRegistryDao.java Dosyayı Görüntüle

8
  * Created by xuxueli on 16/9/30.
8
  * Created by xuxueli on 16/9/30.
9
  */
9
  */
10
 public interface IXxlJobRegistryDao {
10
 public interface IXxlJobRegistryDao {
11
-    List<XxlJobRegistry> findRegistrys(String registryGroup, String registryKey);
11
+    public int removeDead(int timeout);
12
+
13
+    public List<XxlJobRegistry> findAll(int timeout);
12
 }
14
 }

+ 7 - 7
xxl-job-admin/src/main/java/com/xxl/job/admin/dao/impl/XxlJobRegistryDaoImpl.java Dosyayı Görüntüle

6
 import org.springframework.stereotype.Repository;
6
 import org.springframework.stereotype.Repository;
7
 
7
 
8
 import javax.annotation.Resource;
8
 import javax.annotation.Resource;
9
-import java.util.HashMap;
10
 import java.util.List;
9
 import java.util.List;
11
-import java.util.Map;
12
 
10
 
13
 /**
11
 /**
14
  * Created by xuxueli on 16/9/30.
12
  * Created by xuxueli on 16/9/30.
20
     public SqlSessionTemplate sqlSessionTemplate;
18
     public SqlSessionTemplate sqlSessionTemplate;
21
 
19
 
22
     @Override
20
     @Override
23
-    public List<XxlJobRegistry> findRegistrys(String registryGroup, String registryKey) {
24
-        Map<String, String> params = new HashMap<String, String>();
25
-        params.put("registryGroup", registryGroup);
26
-        params.put("registryKey", registryKey);
27
-        return sqlSessionTemplate.selectList("XxlJobRegistryMapper.findRegistrys", params);
21
+    public int removeDead(int timeout) {
22
+        return sqlSessionTemplate.delete("XxlJobRegistryMapper.removeDead", timeout);
23
+    }
24
+
25
+    @Override
26
+    public List<XxlJobRegistry> findAll(int timeout) {
27
+        return sqlSessionTemplate.selectList("XxlJobRegistryMapper.findAll", timeout);
28
     }
28
     }
29
 
29
 
30
 }
30
 }

+ 7 - 9
xxl-job-admin/src/main/resources/mybatis-mapper/XxlJobRegistryMapper.xml Dosyayı Görüntüle

19
 		t.update_time
19
 		t.update_time
20
 	</sql>
20
 	</sql>
21
 	
21
 	
22
-	<select id="findRegistrys" parameterType="java.util.HashMap" resultMap="XxlJobRegistry">
22
+	<delete id="removeDead" parameterType="java.lang.Integer" >
23
+		DELETE FROM XXL_JOB_QRTZ_TRIGGER_REGISTRY
24
+		WHERE update_time <![CDATA[ < ]]> DATE_ADD(NOW(),INTERVAL -#{timeout} SECOND)
25
+	</delete>
26
+
27
+	<select id="findAll" parameterType="java.lang.Integer" resultMap="XxlJobRegistry">
23
 		SELECT <include refid="Base_Column_List" />
28
 		SELECT <include refid="Base_Column_List" />
24
 		FROM XXL_JOB_QRTZ_TRIGGER_REGISTRY AS t
29
 		FROM XXL_JOB_QRTZ_TRIGGER_REGISTRY AS t
25
-		WHERE t.registry_group = #{registryGroup}
26
-			AND t.registry_key = #{registryKey}
27
-			AND t.update_time <![CDATA[ > ]]> DATE_ADD(NOW(),INTERVAL -30 SECOND)
30
+		WHERE t.update_time <![CDATA[ > ]]> DATE_ADD(NOW(),INTERVAL -#{timeout} SECOND)
28
 	</select>
31
 	</select>
29
 
32
 
30
-	<delete id="refresh" >
31
-		delete from XXL_JOB_QRTZ_TRIGGER_REGISTRY
32
-		WHERE update_time <![CDATA[ < ]]> DATE_ADD(NOW(),INTERVAL -30 SECOND)
33
-	</delete>
34
-	
35
 </mapper>
33
 </mapper>

+ 1 - 1
xxl-job-core/src/main/java/com/xxl/job/core/executor/jetty/XxlJobExecutor.java Dosyayı Görüntüle

96
                     try {
96
                     try {
97
                         String address = IpUtil.getIp().concat(":").concat(String.valueOf(port));
97
                         String address = IpUtil.getIp().concat(":").concat(String.valueOf(port));
98
                         registHelper.registry(RegistHelper.RegistType.EXECUTOR.name(), appName, address);
98
                         registHelper.registry(RegistHelper.RegistType.EXECUTOR.name(), appName, address);
99
-                        TimeUnit.SECONDS.sleep(15);
99
+                        TimeUnit.SECONDS.sleep(RegistHelper.TIMEOUT);
100
                     } catch (Exception e) {
100
                     } catch (Exception e) {
101
                         e.printStackTrace();
101
                         e.printStackTrace();
102
                     }
102
                     }

+ 1 - 0
xxl-job-core/src/main/java/com/xxl/job/core/registry/RegistHelper.java Dosyayı Görüntüle

5
  */
5
  */
6
 public interface RegistHelper {
6
 public interface RegistHelper {
7
 
7
 
8
+    public static final int TIMEOUT = 15;
8
     public enum RegistType{ EXECUTOR, ADMIN }
9
     public enum RegistType{ EXECUTOR, ADMIN }
9
 
10
 
10
     public int registry(String registGroup, String registryKey, String registryValue);
11
     public int registry(String registGroup, String registryKey, String registryValue);