Browse Source

调度中心API服务改为自研RPC形式,统一底层通讯模型;

xuxueli 8 years ago
parent
commit
ee07e0b794

+ 1 - 0
doc/XXL-JOB官方文档.md View File

@@ -902,6 +902,7 @@ Tips: 历史版本(V1.3.x)目前已经Release至稳定版本, 进入维护阶段
902 902
 - 3、执行器JobHandler禁止命名冲突;
903 903
 - 4、执行器集群地址列表进行自然排序;
904 904
 - 5、调度中心,DAO层代码精简优化并且新增测试用例覆盖;
905
+- 6、调度中心API服务改为自研RPC形式,统一底层通讯模型;
905 906
 
906 907
 #### TODO LIST
907 908
 - 1、任务权限管理:执行器为粒度分配权限,核心操作校验权限;

+ 44 - 102
xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobApiController.java View File

@@ -1,30 +1,22 @@
1 1
 package com.xxl.job.admin.controller;
2 2
 
3 3
 import com.xxl.job.admin.controller.annotation.PermessionLimit;
4
-import com.xxl.job.admin.core.model.XxlJobInfo;
5
-import com.xxl.job.admin.core.model.XxlJobLog;
6 4
 import com.xxl.job.admin.core.schedule.XxlJobDynamicScheduler;
7
-import com.xxl.job.admin.dao.XxlJobInfoDao;
8
-import com.xxl.job.admin.dao.XxlJobLogDao;
9
-import com.xxl.job.admin.dao.XxlJobRegistryDao;
10
-import com.xxl.job.core.biz.model.HandleCallbackParam;
11
-import com.xxl.job.core.biz.model.RegistryParam;
12
-import com.xxl.job.core.biz.model.ReturnT;
13
-import com.xxl.job.core.util.AdminApiUtil;
14
-import org.apache.commons.lang.StringUtils;
15
-import org.quartz.SchedulerException;
5
+import com.xxl.job.core.biz.AdminBiz;
6
+import com.xxl.job.core.rpc.codec.RpcRequest;
7
+import com.xxl.job.core.rpc.codec.RpcResponse;
8
+import com.xxl.job.core.rpc.netcom.NetComServerFactory;
9
+import com.xxl.job.core.rpc.serialize.HessianSerializer;
10
+import com.xxl.job.core.util.HttpClientUtil;
16 11
 import org.slf4j.Logger;
17 12
 import org.slf4j.LoggerFactory;
18 13
 import org.springframework.stereotype.Controller;
19
-import org.springframework.web.bind.annotation.RequestBody;
20 14
 import org.springframework.web.bind.annotation.RequestMapping;
21
-import org.springframework.web.bind.annotation.RequestMethod;
22
-import org.springframework.web.bind.annotation.ResponseBody;
23 15
 
24
-import javax.annotation.Resource;
25
-import java.text.MessageFormat;
26
-import java.util.Date;
27
-import java.util.List;
16
+import javax.servlet.http.HttpServletRequest;
17
+import javax.servlet.http.HttpServletResponse;
18
+import java.io.IOException;
19
+import java.io.OutputStream;
28 20
 
29 21
 /**
30 22
  * Created by xuxueli on 17/5/10.
@@ -33,100 +25,50 @@ import java.util.List;
33 25
 public class JobApiController {
34 26
     private static Logger logger = LoggerFactory.getLogger(JobApiController.class);
35 27
 
36
-    @Resource
37
-    public XxlJobLogDao xxlJobLogDao;
38
-    @Resource
39
-    private XxlJobInfoDao xxlJobInfoDao;
40
-    @Resource
41
-    private XxlJobRegistryDao xxlJobRegistryDao;
42
-
43
-
44
-    @RequestMapping(value= AdminApiUtil.CALLBACK, method = RequestMethod.POST, consumes = "application/json")
45
-    @ResponseBody
46
-    @PermessionLimit(limit=false)
47
-    public ReturnT<String> callback(@RequestBody List<HandleCallbackParam> callbackParamList){
48
-
49
-        for (HandleCallbackParam handleCallbackParam: callbackParamList) {
50
-            ReturnT<String> callbackResult = callback(handleCallbackParam);
51
-            logger.info("JobApiController.callback {}, handleCallbackParam={}, callbackResult={}",
52
-                    (callbackResult.getCode()==ReturnT.SUCCESS_CODE?"success":"fail"), handleCallbackParam, callbackResult);
53
-        }
54
-
55
-        return ReturnT.SUCCESS;
28
+    static {
29
+        NetComServerFactory.putService(AdminBiz.class, XxlJobDynamicScheduler.adminBiz);
56 30
     }
57 31
 
58
-    private ReturnT<String> callback(HandleCallbackParam handleCallbackParam) {
59
-        // valid log item
60
-        XxlJobLog log = xxlJobLogDao.load(handleCallbackParam.getLogId());
61
-        if (log == null) {
62
-            return new ReturnT<String>(ReturnT.FAIL_CODE, "log item not found.");
63
-        }
64
-
65
-        // trigger success, to trigger child job, and avoid repeat trigger child job
66
-        String childTriggerMsg = null;
67
-        if (ReturnT.SUCCESS_CODE==handleCallbackParam.getExecuteResult().getCode() && ReturnT.SUCCESS_CODE!=log.getHandleCode()) {
68
-            XxlJobInfo xxlJobInfo = xxlJobInfoDao.loadById(log.getJobId());
69
-            if (xxlJobInfo!=null && StringUtils.isNotBlank(xxlJobInfo.getChildJobKey())) {
70
-                childTriggerMsg = "<hr>";
71
-                String[] childJobKeys = xxlJobInfo.getChildJobKey().split(",");
72
-                for (int i = 0; i < childJobKeys.length; i++) {
73
-                    String[] jobKeyArr = childJobKeys[i].split("_");
74
-                    if (jobKeyArr!=null && jobKeyArr.length==2) {
75
-                        XxlJobInfo childJobInfo = xxlJobInfoDao.loadById(Integer.valueOf(jobKeyArr[1]));
76
-                        if (childJobInfo!=null) {
77
-                            try {
78
-                                boolean ret = XxlJobDynamicScheduler.triggerJob(String.valueOf(childJobInfo.getId()), String.valueOf(childJobInfo.getJobGroup()));
32
+    private RpcResponse doInvoke(HttpServletRequest request) {
33
+        try {
34
+            // deserialize request
35
+            byte[] requestBytes = HttpClientUtil.readBytes(request);
36
+            if (requestBytes == null || requestBytes.length==0) {
37
+                RpcResponse rpcResponse = new RpcResponse();
38
+                rpcResponse.setError("RpcRequest byte[] is null");
39
+                return rpcResponse;
40
+            }
41
+            RpcRequest rpcRequest = (RpcRequest) HessianSerializer.deserialize(requestBytes, RpcRequest.class);
79 42
 
80
-                                // add msg
81
-                                childTriggerMsg += MessageFormat.format("<br> {0}/{1} 触发子任务成功, 子任务Key: {2}, status: {3}, 子任务描述: {4}",
82
-                                        (i+1), childJobKeys.length, childJobKeys[i], ret, childJobInfo.getJobDesc());
83
-                            } catch (SchedulerException e) {
84
-                                logger.error("", e);
85
-                            }
86
-                        } else {
87
-                            childTriggerMsg += MessageFormat.format("<br> {0}/{1} 触发子任务失败, 子任务xxlJobInfo不存在, 子任务Key: {2}",
88
-                                    (i+1), childJobKeys.length, childJobKeys[i]);
89
-                        }
90
-                    } else {
91
-                        childTriggerMsg += MessageFormat.format("<br> {0}/{1} 触发子任务失败, 子任务Key格式错误, 子任务Key: {2}",
92
-                                (i+1), childJobKeys.length, childJobKeys[i]);
93
-                    }
94
-                }
43
+            // invoke
44
+            RpcResponse rpcResponse = NetComServerFactory.invokeService(rpcRequest, null);
45
+            return rpcResponse;
46
+        } catch (Exception e) {
47
+            logger.error(e.getMessage(), e);
95 48
 
96
-            }
49
+            RpcResponse rpcResponse = new RpcResponse();
50
+            rpcResponse.setError("Server-error:" + e.getMessage());
51
+            return rpcResponse;
97 52
         }
53
+    }
98 54
 
99
-        // handle msg
100
-        StringBuffer handleMsg = new StringBuffer();
101
-        if (log.getHandleMsg()!=null) {
102
-            handleMsg.append(log.getHandleMsg()).append("<br>");
103
-        }
104
-        if (handleCallbackParam.getExecuteResult().getMsg() != null) {
105
-            handleMsg.append(handleCallbackParam.getExecuteResult().getMsg());
106
-        }
107
-        if (childTriggerMsg !=null) {
108
-            handleMsg.append("<br>子任务触发备注:").append(childTriggerMsg);
109
-        }
55
+    @RequestMapping("/api")
56
+    @PermessionLimit(limit=false)
57
+    public void api(HttpServletRequest request, HttpServletResponse response) throws IOException {
110 58
 
111
-        // success, save log
112
-        log.setHandleTime(new Date());
113
-        log.setHandleCode(handleCallbackParam.getExecuteResult().getCode());
114
-        log.setHandleMsg(handleMsg.toString());
115
-        xxlJobLogDao.updateHandleInfo(log);
59
+        // invoke
60
+        RpcResponse rpcResponse = doInvoke(request);
116 61
 
117
-        return ReturnT.SUCCESS;
118
-    }
62
+        // serialize response
63
+        byte[] responseBytes = HessianSerializer.serialize(rpcResponse);
119 64
 
65
+        response.setContentType("text/html;charset=utf-8");
66
+        response.setStatus(HttpServletResponse.SC_OK);
67
+        //baseRequest.setHandled(true);
120 68
 
121
-    @RequestMapping(value=AdminApiUtil.REGISTRY, method = RequestMethod.POST, consumes = "application/json")
122
-    @ResponseBody
123
-    @PermessionLimit(limit=false)
124
-    public ReturnT<String> registry(@RequestBody RegistryParam registryParam){
125
-        int ret = xxlJobRegistryDao.registryUpdate(registryParam.getRegistGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue());
126
-        if (ret < 1) {
127
-            xxlJobRegistryDao.registrySave(registryParam.getRegistGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue());
128
-        }
129
-        return ReturnT.SUCCESS;
69
+        OutputStream out = response.getOutputStream();
70
+        out.write(responseBytes);
71
+        out.flush();
130 72
     }
131 73
 
132 74
 }

+ 3 - 0
xxl-job-admin/src/main/java/com/xxl/job/admin/core/schedule/XxlJobDynamicScheduler.java View File

@@ -8,6 +8,7 @@ import com.xxl.job.admin.dao.XxlJobGroupDao;
8 8
 import com.xxl.job.admin.dao.XxlJobInfoDao;
9 9
 import com.xxl.job.admin.dao.XxlJobLogDao;
10 10
 import com.xxl.job.admin.dao.XxlJobRegistryDao;
11
+import com.xxl.job.core.biz.AdminBiz;
11 12
 import com.xxl.job.core.rpc.netcom.NetComServerFactory;
12 13
 import org.quartz.*;
13 14
 import org.quartz.Trigger.TriggerState;
@@ -62,6 +63,7 @@ public final class XxlJobDynamicScheduler implements ApplicationContextAware, In
62 63
     public static XxlJobInfoDao xxlJobInfoDao;
63 64
     public static XxlJobRegistryDao xxlJobRegistryDao;
64 65
     public static XxlJobGroupDao xxlJobGroupDao;
66
+    public static AdminBiz adminBiz;
65 67
 
66 68
     @Override
67 69
 	public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
@@ -69,6 +71,7 @@ public final class XxlJobDynamicScheduler implements ApplicationContextAware, In
69 71
 		XxlJobDynamicScheduler.xxlJobInfoDao = applicationContext.getBean(XxlJobInfoDao.class);
70 72
         XxlJobDynamicScheduler.xxlJobRegistryDao = applicationContext.getBean(XxlJobRegistryDao.class);
71 73
         XxlJobDynamicScheduler.xxlJobGroupDao = applicationContext.getBean(XxlJobGroupDao.class);
74
+        XxlJobDynamicScheduler.adminBiz = applicationContext.getBean(AdminBiz.class);
72 75
 	}
73 76
     
74 77
 	@Override

+ 121 - 0
xxl-job-admin/src/main/java/com/xxl/job/admin/service/impl/AdminBizImpl.java View File

@@ -0,0 +1,121 @@
1
+package com.xxl.job.admin.service.impl;
2
+
3
+import com.xxl.job.admin.controller.JobApiController;
4
+import com.xxl.job.admin.core.model.XxlJobInfo;
5
+import com.xxl.job.admin.core.model.XxlJobLog;
6
+import com.xxl.job.admin.core.schedule.XxlJobDynamicScheduler;
7
+import com.xxl.job.admin.dao.XxlJobInfoDao;
8
+import com.xxl.job.admin.dao.XxlJobLogDao;
9
+import com.xxl.job.admin.dao.XxlJobRegistryDao;
10
+import com.xxl.job.core.biz.AdminBiz;
11
+import com.xxl.job.core.biz.model.HandleCallbackParam;
12
+import com.xxl.job.core.biz.model.RegistryParam;
13
+import com.xxl.job.core.biz.model.ReturnT;
14
+import org.apache.commons.lang.StringUtils;
15
+import org.quartz.SchedulerException;
16
+import org.slf4j.Logger;
17
+import org.slf4j.LoggerFactory;
18
+import org.springframework.stereotype.Service;
19
+
20
+import javax.annotation.Resource;
21
+import java.text.MessageFormat;
22
+import java.util.Date;
23
+import java.util.List;
24
+
25
+/**
26
+ * @author xuxueli 2017-07-27 21:54:20
27
+ */
28
+@Service
29
+public class AdminBizImpl implements AdminBiz {
30
+    private static Logger logger = LoggerFactory.getLogger(AdminBizImpl.class);
31
+
32
+    @Resource
33
+    public XxlJobLogDao xxlJobLogDao;
34
+    @Resource
35
+    private XxlJobInfoDao xxlJobInfoDao;
36
+    @Resource
37
+    private XxlJobRegistryDao xxlJobRegistryDao;
38
+
39
+    @Override
40
+    public ReturnT<String> callback(List<HandleCallbackParam> callbackParamList) {
41
+        for (HandleCallbackParam handleCallbackParam: callbackParamList) {
42
+            ReturnT<String> callbackResult = callback(handleCallbackParam);
43
+            logger.info("JobApiController.callback {}, handleCallbackParam={}, callbackResult={}",
44
+                    (callbackResult.getCode()==ReturnT.SUCCESS_CODE?"success":"fail"), handleCallbackParam, callbackResult);
45
+        }
46
+
47
+        return ReturnT.SUCCESS;
48
+    }
49
+
50
+    private ReturnT<String> callback(HandleCallbackParam handleCallbackParam) {
51
+        // valid log item
52
+        XxlJobLog log = xxlJobLogDao.load(handleCallbackParam.getLogId());
53
+        if (log == null) {
54
+            return new ReturnT<String>(ReturnT.FAIL_CODE, "log item not found.");
55
+        }
56
+
57
+        // trigger success, to trigger child job, and avoid repeat trigger child job
58
+        String childTriggerMsg = null;
59
+        if (ReturnT.SUCCESS_CODE==handleCallbackParam.getExecuteResult().getCode() && ReturnT.SUCCESS_CODE!=log.getHandleCode()) {
60
+            XxlJobInfo xxlJobInfo = xxlJobInfoDao.loadById(log.getJobId());
61
+            if (xxlJobInfo!=null && StringUtils.isNotBlank(xxlJobInfo.getChildJobKey())) {
62
+                childTriggerMsg = "<hr>";
63
+                String[] childJobKeys = xxlJobInfo.getChildJobKey().split(",");
64
+                for (int i = 0; i < childJobKeys.length; i++) {
65
+                    String[] jobKeyArr = childJobKeys[i].split("_");
66
+                    if (jobKeyArr!=null && jobKeyArr.length==2) {
67
+                        XxlJobInfo childJobInfo = xxlJobInfoDao.loadById(Integer.valueOf(jobKeyArr[1]));
68
+                        if (childJobInfo!=null) {
69
+                            try {
70
+                                boolean ret = XxlJobDynamicScheduler.triggerJob(String.valueOf(childJobInfo.getId()), String.valueOf(childJobInfo.getJobGroup()));
71
+
72
+                                // add msg
73
+                                childTriggerMsg += MessageFormat.format("<br> {0}/{1} 触发子任务成功, 子任务Key: {2}, status: {3}, 子任务描述: {4}",
74
+                                        (i+1), childJobKeys.length, childJobKeys[i], ret, childJobInfo.getJobDesc());
75
+                            } catch (SchedulerException e) {
76
+                                logger.error("", e);
77
+                            }
78
+                        } else {
79
+                            childTriggerMsg += MessageFormat.format("<br> {0}/{1} 触发子任务失败, 子任务xxlJobInfo不存在, 子任务Key: {2}",
80
+                                    (i+1), childJobKeys.length, childJobKeys[i]);
81
+                        }
82
+                    } else {
83
+                        childTriggerMsg += MessageFormat.format("<br> {0}/{1} 触发子任务失败, 子任务Key格式错误, 子任务Key: {2}",
84
+                                (i+1), childJobKeys.length, childJobKeys[i]);
85
+                    }
86
+                }
87
+
88
+            }
89
+        }
90
+
91
+        // handle msg
92
+        StringBuffer handleMsg = new StringBuffer();
93
+        if (log.getHandleMsg()!=null) {
94
+            handleMsg.append(log.getHandleMsg()).append("<br>");
95
+        }
96
+        if (handleCallbackParam.getExecuteResult().getMsg() != null) {
97
+            handleMsg.append(handleCallbackParam.getExecuteResult().getMsg());
98
+        }
99
+        if (childTriggerMsg !=null) {
100
+            handleMsg.append("<br>子任务触发备注:").append(childTriggerMsg);
101
+        }
102
+
103
+        // success, save log
104
+        log.setHandleTime(new Date());
105
+        log.setHandleCode(handleCallbackParam.getExecuteResult().getCode());
106
+        log.setHandleMsg(handleMsg.toString());
107
+        xxlJobLogDao.updateHandleInfo(log);
108
+
109
+        return ReturnT.SUCCESS;
110
+    }
111
+
112
+    @Override
113
+    public ReturnT<String> registry(RegistryParam registryParam) {
114
+        int ret = xxlJobRegistryDao.registryUpdate(registryParam.getRegistGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue());
115
+        if (ret < 1) {
116
+            xxlJobRegistryDao.registrySave(registryParam.getRegistGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue());
117
+        }
118
+        return ReturnT.SUCCESS;
119
+    }
120
+
121
+}

+ 0 - 25
xxl-job-admin/src/test/java/com/xxl/job/dao/impl/AdminApiTest.java View File

@@ -1,25 +0,0 @@
1
-package com.xxl.job.dao.impl;
2
-
3
-import com.xxl.job.core.biz.model.RegistryParam;
4
-import com.xxl.job.core.biz.model.ReturnT;
5
-import com.xxl.job.core.enums.RegistryConfig;
6
-import com.xxl.job.core.util.AdminApiUtil;
7
-
8
-/**
9
- * Created by xuxueli on 17/5/10.
10
- */
11
-public class AdminApiTest {
12
-
13
-    public static void main(String[] args) {
14
-        try {
15
-            RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), "aaa", "112312312312");
16
-
17
-            AdminApiUtil.init("http://localhost:8080/xxl-job-admin");
18
-            ReturnT<String> registryResult = AdminApiUtil.callApiFailover(AdminApiUtil.REGISTRY, registryParam);
19
-            System.out.println(registryResult);
20
-        } catch (Exception e) {
21
-            e.printStackTrace();
22
-        }
23
-    }
24
-
25
-}

+ 30 - 0
xxl-job-core/src/main/java/com/xxl/job/core/biz/AdminBiz.java View File

@@ -0,0 +1,30 @@
1
+package com.xxl.job.core.biz;
2
+
3
+import com.xxl.job.core.biz.model.HandleCallbackParam;
4
+import com.xxl.job.core.biz.model.RegistryParam;
5
+import com.xxl.job.core.biz.model.ReturnT;
6
+
7
+import java.util.List;
8
+
9
+/**
10
+ * @author xuxueli 2017-07-27 21:52:49
11
+ */
12
+public interface AdminBiz {
13
+
14
+    /**
15
+     * callback
16
+     *
17
+     * @param callbackParamList
18
+     * @return
19
+     */
20
+    public ReturnT<String> callback(List<HandleCallbackParam> callbackParamList);
21
+
22
+    /**
23
+     * registry
24
+     *
25
+     * @param registryParam
26
+     * @return
27
+     */
28
+    public ReturnT<String> registry(RegistryParam registryParam);
29
+
30
+}

+ 1 - 5
xxl-job-core/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java View File

@@ -8,7 +8,6 @@ import com.xxl.job.core.rpc.netcom.NetComServerFactory;
8 8
 import com.xxl.job.core.thread.ExecutorRegistryThread;
9 9
 import com.xxl.job.core.thread.JobThread;
10 10
 import com.xxl.job.core.thread.TriggerCallbackThread;
11
-import com.xxl.job.core.util.AdminApiUtil;
12 11
 import org.slf4j.Logger;
13 12
 import org.slf4j.LoggerFactory;
14 13
 import org.springframework.beans.BeansException;
@@ -30,7 +29,7 @@ public class XxlJobExecutor implements ApplicationContextAware, ApplicationListe
30 29
     private String ip;
31 30
     private int port = 9999;
32 31
     private String appName;
33
-    private String adminAddresses;
32
+    public static String adminAddresses;
34 33
     public static String logPath;
35 34
 
36 35
     public void setIp(String ip) {
@@ -52,9 +51,6 @@ public class XxlJobExecutor implements ApplicationContextAware, ApplicationListe
52 51
     // ---------------------------------- job server ------------------------------------
53 52
     private NetComServerFactory serverFactory = new NetComServerFactory();
54 53
     public void start() throws Exception {
55
-        // admin api util init
56
-        AdminApiUtil.init(adminAddresses);
57
-
58 54
         // executor start
59 55
         NetComServerFactory.putService(ExecutorBiz.class, new ExecutorBizImpl());
60 56
         serverFactory.start(port, ip, appName);

+ 7 - 1
xxl-job-core/src/main/java/com/xxl/job/core/rpc/netcom/jetty/client/JettyClient.java View File

@@ -19,8 +19,14 @@ public class JettyClient {
19 19
 			// serialize request
20 20
 			byte[] requestBytes = HessianSerializer.serialize(request);
21 21
 
22
+			// reqURL
23
+			String reqURL = request.getServerAddress();
24
+			if (reqURL!=null && reqURL.indexOf("http://")==-1) {
25
+				reqURL = "http://" + request.getServerAddress() + "/";
26
+			}
27
+
22 28
 			// remote invoke
23
-			byte[] responseBytes = HttpClientUtil.postRequest("http://" + request.getServerAddress() + "/", requestBytes);
29
+			byte[] responseBytes = HttpClientUtil.postRequest(reqURL, requestBytes);
24 30
 			if (responseBytes == null || responseBytes.length==0) {
25 31
 				RpcResponse rpcResponse = new RpcResponse();
26 32
 				rpcResponse.setError("RpcResponse byte[] is null");

+ 22 - 4
xxl-job-core/src/main/java/com/xxl/job/core/thread/ExecutorRegistryThread.java View File

@@ -1,9 +1,11 @@
1 1
 package com.xxl.job.core.thread;
2 2
 
3
+import com.xxl.job.core.biz.AdminBiz;
3 4
 import com.xxl.job.core.biz.model.RegistryParam;
4 5
 import com.xxl.job.core.biz.model.ReturnT;
5 6
 import com.xxl.job.core.enums.RegistryConfig;
6
-import com.xxl.job.core.util.AdminApiUtil;
7
+import com.xxl.job.core.executor.XxlJobExecutor;
8
+import com.xxl.job.core.rpc.netcom.NetComClientProxy;
7 9
 import com.xxl.job.core.util.IpUtil;
8 10
 import org.slf4j.Logger;
9 11
 import org.slf4j.LoggerFactory;
@@ -26,8 +28,12 @@ public class ExecutorRegistryThread extends Thread {
26 28
     public void start(final int port, final String ip, final String appName){
27 29
 
28 30
         // valid
29
-        if ( !(AdminApiUtil.allowCallApi() && (appName!=null && appName.trim().length()>0)) ) {
30
-            logger.warn(">>>>>>>>>>>> xxl-job, executor registry config fail");
31
+        if (appName==null || appName.trim().length()==0) {
32
+            logger.warn(">>>>>>>>>>>> xxl-job, executor registry config fail, appName is null.");
33
+            return;
34
+        }
35
+        if (XxlJobExecutor.adminAddresses==null || XxlJobExecutor.adminAddresses.trim().length()==0) {
36
+            logger.warn(">>>>>>>>>>>> xxl-job, executor registry config fail, adminAddresses is null.");
31 37
             return;
32 38
         }
33 39
 
@@ -45,7 +51,19 @@ public class ExecutorRegistryThread extends Thread {
45 51
                 while (!toStop) {
46 52
                     try {
47 53
                         RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appName, executorAddress);
48
-                        ReturnT<String> registryResult = AdminApiUtil.callApiFailover(AdminApiUtil.REGISTRY, registryParam);
54
+                        ReturnT<String> registryResult = null;
55
+
56
+                        for (String addressUrl: XxlJobExecutor.adminAddresses.split(",")) {
57
+                            String apiUrl = addressUrl.concat("/api");
58
+
59
+                            AdminBiz adminBiz = (AdminBiz) new NetComClientProxy(AdminBiz.class, apiUrl).getObject();
60
+                            registryResult = adminBiz.registry(registryParam);
61
+                            if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {
62
+                                registryResult = ReturnT.SUCCESS;
63
+                                break;
64
+                            }
65
+                        }
66
+
49 67
                         logger.info(">>>>>>>>>>> xxl-job Executor registry {}, RegistryParam:{}, registryResult:{}",
50 68
                                 new Object[]{(registryResult.getCode()==ReturnT.SUCCESS_CODE?"success":"fail"), registryParam.toString(), registryResult.toString()});
51 69
                     } catch (Exception e) {

+ 23 - 3
xxl-job-core/src/main/java/com/xxl/job/core/thread/TriggerCallbackThread.java View File

@@ -1,8 +1,10 @@
1 1
 package com.xxl.job.core.thread;
2 2
 
3
+import com.xxl.job.core.biz.AdminBiz;
3 4
 import com.xxl.job.core.biz.model.HandleCallbackParam;
4 5
 import com.xxl.job.core.biz.model.ReturnT;
5
-import com.xxl.job.core.util.AdminApiUtil;
6
+import com.xxl.job.core.executor.XxlJobExecutor;
7
+import com.xxl.job.core.rpc.netcom.NetComClientProxy;
6 8
 import org.slf4j.Logger;
7 9
 import org.slf4j.LoggerFactory;
8 10
 
@@ -35,14 +37,32 @@ public class TriggerCallbackThread {
35 37
                         HandleCallbackParam callback = getInstance().callBackQueue.take();
36 38
                         if (callback != null) {
37 39
 
38
-                            // callback list
40
+                            // valid
41
+                            if (XxlJobExecutor.adminAddresses==null || XxlJobExecutor.adminAddresses.trim().length()==0) {
42
+                                logger.warn(">>>>>>>>>>>> xxl-job callback fail, adminAddresses is null.");
43
+                                continue;
44
+                            }
45
+
46
+                            // callback list param
39 47
                             List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>();
40 48
                             int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList);
41 49
                             callbackParamList.add(callback);
42 50
 
43 51
                             // callback, will retry if error
44 52
                             try {
45
-                                ReturnT<String> callbackResult = AdminApiUtil.callApiFailover(AdminApiUtil.CALLBACK, callbackParamList);
53
+
54
+                                ReturnT<String> callbackResult = null;
55
+                                for (String addressUrl: XxlJobExecutor.adminAddresses.split(",")) {
56
+                                    String apiUrl = addressUrl.concat("/api");
57
+
58
+                                    AdminBiz adminBiz = (AdminBiz) new NetComClientProxy(AdminBiz.class, apiUrl).getObject();
59
+                                    callbackResult = adminBiz.callback(callbackParamList);
60
+                                    if (callbackResult!=null && ReturnT.SUCCESS_CODE == callbackResult.getCode()) {
61
+                                        callbackResult = ReturnT.SUCCESS;
62
+                                        break;
63
+                                    }
64
+                                }
65
+
46 66
                                 logger.info(">>>>>>>>>>> xxl-job callback, callbackParamList:{}, callbackResult:{}", new Object[]{callbackParamList, callbackResult});
47 67
                             } catch (Exception e) {
48 68
                                 logger.error(">>>>>>>>>>> xxl-job TriggerCallbackThread Exception:", e);

+ 0 - 127
xxl-job-core/src/main/java/com/xxl/job/core/util/AdminApiUtil.java View File

@@ -1,127 +0,0 @@
1
-package com.xxl.job.core.util;
2
-
3
-import com.xxl.job.core.biz.model.ReturnT;
4
-import org.apache.http.HttpEntity;
5
-import org.apache.http.HttpResponse;
6
-import org.apache.http.client.config.RequestConfig;
7
-import org.apache.http.client.methods.HttpPost;
8
-import org.apache.http.entity.StringEntity;
9
-import org.apache.http.impl.client.CloseableHttpClient;
10
-import org.apache.http.impl.client.HttpClients;
11
-import org.apache.http.util.EntityUtils;
12
-import org.slf4j.Logger;
13
-import org.slf4j.LoggerFactory;
14
-
15
-import java.io.IOException;
16
-import java.util.ArrayList;
17
-import java.util.HashSet;
18
-import java.util.List;
19
-import java.util.Set;
20
-
21
-/**
22
- * @author xuxueli 2017-05-10 21:28:15
23
- */
24
-public class AdminApiUtil {
25
-	private static Logger logger = LoggerFactory.getLogger(AdminApiUtil.class);
26
-
27
-	public static final String CALLBACK = "/api/callback";
28
-	public static final String REGISTRY = "/api/registry";
29
-
30
-	private static List<String> adminAddressList = null;
31
-	public static void init(String adminAddresses){
32
-		// admin assress list
33
-		if (adminAddresses != null) {
34
-			Set<String> adminAddressSet = new HashSet<String>();
35
-			for (String adminAddressItem: adminAddresses.split(",")) {
36
-				if (adminAddressItem.trim().length()>0) {
37
-					adminAddressSet.add(adminAddressItem);
38
-				}
39
-			}
40
-            adminAddressList = new ArrayList<String>(adminAddressSet);
41
-		}
42
-	}
43
-	public static boolean allowCallApi(){
44
-        boolean allowCallApi = (adminAddressList!=null && adminAddressList.size()>0);
45
-        return allowCallApi;
46
-    }
47
-
48
-	public static ReturnT<String> callApiFailover(String subUrl, Object requestObj) throws Exception {
49
-
50
-		if (!allowCallApi()) {
51
-			return new ReturnT<String>(ReturnT.FAIL_CODE, "allowCallApi fail.");
52
-		}
53
-
54
-		for (String adminAddress: adminAddressList) {
55
-			ReturnT<String> registryResult = null;
56
-			try {
57
-				String apiUrl = adminAddress.concat(subUrl);
58
-				registryResult = callApi(apiUrl, requestObj);
59
-			} catch (Exception e) {
60
-				logger.error(e.getMessage(), e);
61
-			}
62
-			if (registryResult!=null && registryResult.getCode()==ReturnT.SUCCESS_CODE) {
63
-				return ReturnT.SUCCESS;
64
-			}
65
-		}
66
-		return ReturnT.FAIL;
67
-	}
68
-
69
-	private static ReturnT<String> callApi(String finalUrl, Object requestObj) throws Exception {
70
-		HttpPost httpPost = new HttpPost(finalUrl);
71
-		CloseableHttpClient httpClient = HttpClients.createDefault();
72
-		try {
73
-
74
-			// timeout
75
-			RequestConfig requestConfig = RequestConfig.custom()
76
-					.setConnectionRequestTimeout(10000)
77
-					.setSocketTimeout(10000)
78
-					.setConnectTimeout(10000)
79
-					.build();
80
-
81
-			httpPost.setConfig(requestConfig);
82
-
83
-			// data
84
-			if (requestObj != null) {
85
-				String json = JacksonUtil.writeValueAsString(requestObj);
86
-
87
-				StringEntity entity = new StringEntity(json, "utf-8");
88
-				entity.setContentEncoding("UTF-8");
89
-				entity.setContentType("application/json");
90
-
91
-				httpPost.setEntity(entity);
92
-			}
93
-
94
-			// do post
95
-			HttpResponse response = httpClient.execute(httpPost);
96
-			HttpEntity entity = response.getEntity();
97
-			if (null != entity) {
98
-				String responseMsg = EntityUtils.toString(entity, "UTF-8");
99
-				if (response.getStatusLine().getStatusCode() != 200) {
100
-					EntityUtils.consume(entity);
101
-					return new ReturnT<String>(response.getStatusLine().getStatusCode(),
102
-							"StatusCode(+"+ response.getStatusLine().getStatusCode() +") Error,response:" + responseMsg);
103
-				}
104
-
105
-				EntityUtils.consume(entity);
106
-				if (responseMsg!=null && responseMsg.startsWith("{")) {
107
-					ReturnT<String> result = JacksonUtil.readValue(responseMsg, ReturnT.class);
108
-					return result;
109
-				}
110
-			}
111
-			return ReturnT.FAIL;
112
-		} catch (Exception e) {
113
-			logger.error("", e);
114
-			return new ReturnT<String>(ReturnT.FAIL_CODE, e.getMessage());
115
-		} finally {
116
-			if (httpPost!=null) {
117
-				httpPost.releaseConnection();
118
-			}
119
-			try {
120
-				httpClient.close();
121
-			} catch (IOException e) {
122
-				e.printStackTrace();
123
-			}
124
-		}
125
-	}
126
-	
127
-}

+ 0 - 128
xxl-job-core/src/main/java/com/xxl/job/core/util/DBUtil.java View File

@@ -1,128 +0,0 @@
1
-package com.xxl.job.core.util;
2
-
3
-import javax.sql.DataSource;
4
-import java.sql.*;
5
-import java.util.*;
6
-
7
-/**
8
- * Created by xuxueli on 16/9/30.
9
- */
10
-public class DBUtil {
11
-
12
-    private static Connection getConn(DataSource dataSource) {
13
-        try {
14
-            return dataSource.getConnection();
15
-        } catch (SQLException e) {
16
-            e.printStackTrace();
17
-        }
18
-        return null;
19
-    }
20
-
21
-    /**
22
-     * update
23
-     *
24
-     * @param dataSource
25
-     * @param sql
26
-     * @param params
27
-     */
28
-    public static int update(DataSource dataSource, String sql, Object params[]) {
29
-        Connection connection = getConn(dataSource);
30
-        PreparedStatement preparedStatement = null;
31
-        int ret = 0;
32
-        try {
33
-            preparedStatement = connection.prepareStatement(sql);
34
-            if (params != null) {
35
-                for (int i = 0; i < params.length; i++) {
36
-                    preparedStatement.setObject(i + 1, params[i]);
37
-                }
38
-            }
39
-            ret = preparedStatement.executeUpdate();
40
-        } catch (SQLException e) {
41
-            e.printStackTrace();
42
-        } finally {
43
-            release(connection, preparedStatement, null);
44
-        }
45
-        return ret;
46
-    }
47
-
48
-    /**
49
-     * query
50
-     *
51
-     * @param dataSource
52
-     * @param sql
53
-     * @param params
54
-     * @return
55
-     */
56
-    public static List<Map<String, Object>> query(DataSource dataSource, String sql, Object[] params) {
57
-        Connection connection = getConn(dataSource);
58
-        PreparedStatement preparedStatement = null;
59
-        ResultSet resultSet = null;
60
-        try {
61
-            preparedStatement = connection.prepareStatement(sql);
62
-            if (params != null) {
63
-                for (int i = 0; i < params.length; i++) {
64
-                    preparedStatement.setObject(i + 1, params[i]);
65
-                }
66
-            }
67
-            resultSet = preparedStatement.executeQuery();
68
-
69
-            List<Map<String, Object>> ret = resultSetToList(resultSet);
70
-            return ret;
71
-        } catch (SQLException e) {
72
-            e.printStackTrace();
73
-        } finally {
74
-            release(connection, preparedStatement, resultSet);
75
-        }
76
-        return null;
77
-    }
78
-
79
-    private static List<Map<String, Object>> resultSetToList(ResultSet resultSet) throws SQLException {
80
-        if (resultSet == null) {
81
-            return new ArrayList<Map<String, Object>>();
82
-        }
83
-
84
-        ResultSetMetaData resultSetMetaData = resultSet.getMetaData(); // 得到结果集(rs)的结构信息,比如字段数、字段名等
85
-        int columnCount = resultSetMetaData.getColumnCount(); // 返回此 ResultSet 对象中的列数
86
-
87
-        List<Map<String, Object>> list = new ArrayList<Map<String, Object>>();
88
-        while (resultSet.next()) {
89
-            Map<String, Object> rowData = new HashMap<String, Object>(columnCount);
90
-            for (int i = 1; i <= columnCount; i++) {
91
-                rowData.put(resultSetMetaData.getColumnName(i), resultSet.getObject(i));
92
-            }
93
-            list.add(rowData);
94
-        }
95
-        return list;
96
-    }
97
-
98
-    /**
99
-     * release
100
-     * @param connection
101
-     * @param preparedStatement
102
-     * @param resultSet
103
-     */
104
-    public static void release(Connection connection, PreparedStatement preparedStatement, ResultSet resultSet) {
105
-        if (resultSet != null) {
106
-            try {
107
-                resultSet.close();
108
-            } catch (SQLException e) {
109
-                e.printStackTrace();
110
-            }
111
-        }
112
-        if (preparedStatement != null) {
113
-            try {
114
-                preparedStatement.close();
115
-            } catch (SQLException e) {
116
-                e.printStackTrace();
117
-            }
118
-        }
119
-        if (connection != null) {
120
-            try {
121
-                connection.close();
122
-            } catch (SQLException e) {
123
-                e.printStackTrace();
124
-            }
125
-        }
126
-    }
127
-
128
-}