Pārlūkot izejas kodu

阻塞处理策略

xueli.xue 8 gadus atpakaļ
vecāks
revīzija
0888b2d5a4

+ 1 - 0
xxl-job-admin/src/main/java/com/xxl/job/admin/core/jobbean/RemoteHttpJobBean.java Parādīt failu

@@ -61,6 +61,7 @@ public class RemoteHttpJobBean extends QuartzJobBean {
61 61
 		triggerParam.setJobId(jobInfo.getId());
62 62
 		triggerParam.setExecutorHandler(jobInfo.getExecutorHandler());
63 63
 		triggerParam.setExecutorParams(jobInfo.getExecutorParam());
64
+        triggerParam.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy());
64 65
 		triggerParam.setGlueType(jobInfo.getGlueType());
65 66
 		triggerParam.setGlueSource(jobInfo.getGlueSource());
66 67
 		triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime());

+ 4 - 2
xxl-job-core/src/main/java/com/xxl/job/core/biz/impl/ExecutorBizImpl.java Parādīt failu

@@ -4,6 +4,7 @@ import com.xxl.job.core.biz.ExecutorBiz;
4 4
 import com.xxl.job.core.biz.model.LogResult;
5 5
 import com.xxl.job.core.biz.model.ReturnT;
6 6
 import com.xxl.job.core.biz.model.TriggerParam;
7
+import com.xxl.job.core.enums.ExecutorBlockStrategyEnum;
7 8
 import com.xxl.job.core.executor.XxlJobExecutor;
8 9
 import com.xxl.job.core.glue.GlueFactory;
9 10
 import com.xxl.job.core.glue.GlueTypeEnum;
@@ -128,8 +129,9 @@ public class ExecutorBizImpl implements ExecutorBiz {
128 129
         }
129 130
 
130 131
         // push data to queue
131
-        jobThread.pushTriggerQueue(triggerParam);
132
-        return ReturnT.SUCCESS;
132
+        ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null);
133
+        ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam, blockStrategy);
134
+        return pushResult;
133 135
     }
134 136
 
135 137
 }

+ 9 - 0
xxl-job-core/src/main/java/com/xxl/job/core/biz/model/TriggerParam.java Parādīt failu

@@ -13,6 +13,7 @@ public class TriggerParam implements Serializable{
13 13
 
14 14
     private String executorHandler;
15 15
     private String executorParams;
16
+    private String executorBlockStrategy;
16 17
 
17 18
     private String glueType;
18 19
     private String glueSource;
@@ -47,6 +48,14 @@ public class TriggerParam implements Serializable{
47 48
         this.executorParams = executorParams;
48 49
     }
49 50
 
51
+    public String getExecutorBlockStrategy() {
52
+        return executorBlockStrategy;
53
+    }
54
+
55
+    public void setExecutorBlockStrategy(String executorBlockStrategy) {
56
+        this.executorBlockStrategy = executorBlockStrategy;
57
+    }
58
+
50 59
     public String getGlueType() {
51 60
         return glueType;
52 61
     }

+ 29 - 4
xxl-job-core/src/main/java/com/xxl/job/core/thread/JobThread.java Parādīt failu

@@ -3,6 +3,7 @@ package com.xxl.job.core.thread;
3 3
 import com.xxl.job.core.biz.model.HandleCallbackParam;
4 4
 import com.xxl.job.core.biz.model.ReturnT;
5 5
 import com.xxl.job.core.biz.model.TriggerParam;
6
+import com.xxl.job.core.enums.ExecutorBlockStrategyEnum;
6 7
 import com.xxl.job.core.handler.IJobHandler;
7 8
 import com.xxl.job.core.log.XxlJobFileAppender;
8 9
 import com.xxl.job.core.log.XxlJobLogger;
@@ -31,6 +32,9 @@ public class JobThread extends Thread{
31 32
 	private boolean toStop = false;
32 33
 	private String stopReason;
33 34
 
35
+    private boolean running = false;
36
+
37
+
34 38
 	public JobThread(IJobHandler handler) {
35 39
 		this.handler = handler;
36 40
 		triggerQueue = new LinkedBlockingQueue<TriggerParam>();
@@ -40,14 +44,33 @@ public class JobThread extends Thread{
40 44
 		return handler;
41 45
 	}
42 46
 
43
-	public void pushTriggerQueue(TriggerParam triggerParam) {
47
+	public ReturnT<String> pushTriggerQueue(TriggerParam triggerParam, ExecutorBlockStrategyEnum blockStrategy) {
48
+		// avoid repeat
44 49
 		if (triggerLogIdSet.contains(triggerParam.getLogId())) {
45 50
 			logger.debug("repeate trigger job, logId:{}", triggerParam.getLogId());
46
-			return;
51
+			return new ReturnT<String>(ReturnT.FAIL_CODE, "repeate trigger job, logId:" + triggerParam.getLogId());
52
+		}
53
+
54
+		// block strategy
55
+		if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) {
56
+            // discard when running
57
+            if (running) {
58
+                return new ReturnT<String>(ReturnT.FAIL_CODE, "任务阻塞:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle());
59
+            }
60
+		} else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) {
61
+            // kill running old and clear queue
62
+            if (running) {
63
+                this.interrupt();
64
+            }
65
+            triggerQueue.clear();
66
+            triggerLogIdSet.clear();
67
+		} else {
68
+            // just add to queue
47 69
 		}
48 70
 
49 71
 		triggerLogIdSet.add(triggerParam.getLogId());
50 72
 		triggerQueue.add(triggerParam);
73
+        return ReturnT.SUCCESS;
51 74
 	}
52 75
 
53 76
 	public void toStop(String stopReason) {
@@ -59,15 +82,17 @@ public class JobThread extends Thread{
59 82
 		this.toStop = true;
60 83
 		this.stopReason = stopReason;
61 84
 	}
62
-	
63
-	int i = 1;
85
+
86
+
64 87
 	@Override
65 88
 	public void run() {
66 89
 		while(!toStop){
90
+			running = false;
67 91
 			try {
68 92
 				// to check toStop signal, we need cycle, so wo cannot use queue.take(), instand of poll(timeout)
69 93
 				TriggerParam triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS);
70 94
 				if (triggerParam!=null) {
95
+					running = true;
71 96
 					triggerLogIdSet.remove(triggerParam.getLogId());
72 97
 					
73 98
 					// parse param