Преглед изворни кода

Merge branch 'master' of https://github.com/xuxueli/xxl-job.git

xueli.xue пре 9 година
родитељ
комит
dbbb1f069a

+ 50 - 0
xxl-job-admin/src/main/java/com/xxl/job/service/job/LocalJobBean.java Прегледај датотеку

@@ -0,0 +1,50 @@
1
+package com.xxl.job.service.job;
2
+
3
+import java.util.HashMap;
4
+import java.util.Map;
5
+import java.util.Map.Entry;
6
+import java.util.concurrent.TimeUnit;
7
+
8
+import org.quartz.DisallowConcurrentExecution;
9
+import org.quartz.JobExecutionContext;
10
+import org.quartz.JobExecutionException;
11
+import org.slf4j.Logger;
12
+import org.slf4j.LoggerFactory;
13
+import org.springframework.scheduling.quartz.QuartzJobBean;
14
+
15
+/**
16
+ * http job bean
17
+ * @author xuxueli 2015-12-17 18:20:34
18
+ */
19
+
20
+@DisallowConcurrentExecution	// 串行;线程数要多配置几个,否则不生效;
21
+public class LocalJobBean extends QuartzJobBean {
22
+	private static Logger logger = LoggerFactory.getLogger(LocalJobBean.class);
23
+
24
+	@Override
25
+	protected void executeInternal(JobExecutionContext context)
26
+			throws JobExecutionException {
27
+		
28
+		String triggerKey = context.getTrigger().getKey().getName();
29
+		String triggerGroup = context.getTrigger().getKey().getGroup();
30
+		Map<String, Object> jobDataMap = context.getMergedJobDataMap().getWrappedMap();
31
+		
32
+		// jobDataMap 2 params
33
+		Map<String, String> params = new HashMap<String, String>();
34
+		if (jobDataMap!=null && jobDataMap.size()>0) {
35
+			for (Entry<String, Object> item : jobDataMap.entrySet()) {
36
+				params.put(item.getKey(), String.valueOf(item.getValue()));
37
+			}
38
+		}
39
+		
40
+		try {
41
+			TimeUnit.SECONDS.sleep(5);
42
+		} catch (InterruptedException e) {
43
+			e.printStackTrace();
44
+		}
45
+		
46
+		logger.info(">>>>>>>>>>> xxl-job run :jobId:{}, group:{}, jobDataMap:{}", 
47
+				new Object[]{triggerKey, triggerGroup, jobDataMap});
48
+    }
49
+	
50
+}

+ 48 - 0
xxl-job-admin/src/main/java/com/xxl/job/service/job/LocalJobBeanB.java Прегледај датотеку

@@ -0,0 +1,48 @@
1
+package com.xxl.job.service.job;
2
+
3
+import java.util.HashMap;
4
+import java.util.Map;
5
+import java.util.Map.Entry;
6
+import java.util.concurrent.TimeUnit;
7
+
8
+import org.quartz.JobExecutionContext;
9
+import org.quartz.JobExecutionException;
10
+import org.slf4j.Logger;
11
+import org.slf4j.LoggerFactory;
12
+import org.springframework.scheduling.quartz.QuartzJobBean;
13
+
14
+/**
15
+ * http job bean
16
+ * @author xuxueli 2015-12-17 18:20:34
17
+ */
18
+
19
+public class LocalJobBeanB extends QuartzJobBean {
20
+	private static Logger logger = LoggerFactory.getLogger(LocalJobBeanB.class);
21
+
22
+	@Override
23
+	protected void executeInternal(JobExecutionContext context)
24
+			throws JobExecutionException {
25
+		
26
+		String triggerKey = context.getTrigger().getKey().getName();
27
+		String triggerGroup = context.getTrigger().getKey().getGroup();
28
+		Map<String, Object> jobDataMap = context.getMergedJobDataMap().getWrappedMap();
29
+		
30
+		// jobDataMap 2 params
31
+		Map<String, String> params = new HashMap<String, String>();
32
+		if (jobDataMap!=null && jobDataMap.size()>0) {
33
+			for (Entry<String, Object> item : jobDataMap.entrySet()) {
34
+				params.put(item.getKey(), String.valueOf(item.getValue()));
35
+			}
36
+		}
37
+		
38
+		try {
39
+			TimeUnit.SECONDS.sleep(5);
40
+		} catch (InterruptedException e) {
41
+			e.printStackTrace();
42
+		}
43
+		
44
+		logger.info(">>>>>>>>>>> xxl-job run :jobId:{}, group:{}, jobDataMap:{}", 
45
+				new Object[]{triggerKey, triggerGroup, jobDataMap});
46
+    }
47
+	
48
+}

+ 94 - 0
xxl-job-client/src/main/java/com/xxl/job/client/handler/HandlerThread.java Прегледај датотеку

@@ -0,0 +1,94 @@
1
+package com.xxl.job.client.handler;
2
+
3
+import java.io.PrintWriter;
4
+import java.io.StringWriter;
5
+import java.util.HashMap;
6
+import java.util.Map;
7
+import java.util.concurrent.LinkedBlockingQueue;
8
+import java.util.concurrent.TimeUnit;
9
+
10
+import org.slf4j.Logger;
11
+import org.slf4j.LoggerFactory;
12
+
13
+import com.xxl.job.client.handler.IJobHandler.JobHandleStatus;
14
+import com.xxl.job.client.util.HttpUtil;
15
+
16
+/**
17
+ * handler thread
18
+ * @author xuxueli 2016-1-16 19:52:47
19
+ */
20
+public class HandlerThread extends Thread{
21
+	private static Logger logger = LoggerFactory.getLogger(HandlerThread.class);
22
+	
23
+	private IJobHandler handler;
24
+	private LinkedBlockingQueue<Map<String, String>> handlerDataQueue;
25
+	
26
+	public HandlerThread(IJobHandler handler) {
27
+		this.handler = handler;
28
+		handlerDataQueue = new LinkedBlockingQueue<Map<String,String>>();
29
+	}
30
+	
31
+	public void pushData(Map<String, String> param) {
32
+		handlerDataQueue.offer(param);
33
+	}
34
+	
35
+	int i = 1;
36
+	@Override
37
+	public void run() {
38
+		try {
39
+			i++;
40
+			Map<String, String> handlerData = handlerDataQueue.poll();
41
+			if (handlerData!=null) {
42
+				String trigger_log_url = handlerData.get(HandlerRepository.TRIGGER_LOG_URL);
43
+				String trigger_log_id = handlerData.get(HandlerRepository.TRIGGER_LOG_ID);
44
+				String handler_params = handlerData.get(HandlerRepository.HANDLER_PARAMS);
45
+				
46
+				// parse param
47
+				String[] handlerParams = null; 
48
+				if (handler_params!=null && handler_params.trim().length()>0) {
49
+					handlerParams = handler_params.split(",");
50
+				} else {
51
+					handlerParams = new String[0];
52
+				}
53
+				
54
+				// handle job
55
+				JobHandleStatus _status = JobHandleStatus.FAIL;
56
+				String _msg = null;
57
+				try {
58
+					_status = handler.handle(handlerParams);
59
+				} catch (Exception e) {
60
+					logger.info("HandlerThread Exception:", e);
61
+					StringWriter out = new StringWriter();
62
+					e.printStackTrace(new PrintWriter(out));
63
+					_msg = out.toString();
64
+				}
65
+
66
+				// callback handler info
67
+				String callback_response[] = null;
68
+				try {
69
+					
70
+					HashMap<String, String> params = new HashMap<String, String>();
71
+					params.put(HandlerRepository.TRIGGER_LOG_ID, trigger_log_id);
72
+					params.put(HttpUtil.status, _status.name());
73
+					params.put(HttpUtil.msg, _msg);
74
+					callback_response = HttpUtil.post(trigger_log_url, params);
75
+				} catch (Exception e) {
76
+					logger.info("HandlerThread Exception:", e);
77
+				}
78
+				logger.info("<<<<<<<<<<< xxl-job thread handle, handlerData:{}, callback_status:{}, callback_msg:{}, callback_response:{}, thread:{}", 
79
+						new Object[]{handlerData, _status, _msg, callback_response, this});
80
+			} else {
81
+				try {
82
+					TimeUnit.MILLISECONDS.sleep(i * 100);
83
+				} catch (InterruptedException e) {
84
+					e.printStackTrace();
85
+				}
86
+				if (i>5) {
87
+					i= 0;
88
+				}
89
+			}
90
+		} catch (Exception e) {
91
+			logger.info("HandlerThread Exception:", e);
92
+		}
93
+	}
94
+}