Quellcode durchsuchen

任务回调队列 + 任务执行队列,poll改为take;修复一处可能导致cpu占满的问题

xueli.xue vor 9 Jahren
Ursprung
Commit
5c2bcb9ffd

+ 2 - 2
xxl-job-core/src/main/java/com/xxl/job/core/handler/HandlerRepository.java Datei anzeigen

192
 			public void run() {
192
 			public void run() {
193
 				while(true){
193
 				while(true){
194
 					try {
194
 					try {
195
-						HashMap<String, String> item = callBackQueue.poll();
195
+						HashMap<String, String> item = callBackQueue.take();
196
 						if (item != null) {
196
 						if (item != null) {
197
 							RemoteCallBack callback = null;
197
 							RemoteCallBack callback = null;
198
 							try {
198
 							try {
201
 								logger.info("HandlerThread Exception:", e);
201
 								logger.info("HandlerThread Exception:", e);
202
 							}
202
 							}
203
 							logger.info(">>>>>>>>>>> xxl-job callback , params:{}, result:{}", new Object[]{item, callback});
203
 							logger.info(">>>>>>>>>>> xxl-job callback , params:{}, result:{}", new Object[]{item, callback});
204
-						} 
204
+						}
205
 					} catch (Exception e) {
205
 					} catch (Exception e) {
206
 						e.printStackTrace();
206
 						e.printStackTrace();
207
 					}
207
 					}

+ 9 - 22
xxl-job-core/src/main/java/com/xxl/job/core/handler/HandlerThread.java Datei anzeigen

1
 package com.xxl.job.core.handler;
1
 package com.xxl.job.core.handler;
2
 
2
 
3
+import com.xxl.job.core.handler.HandlerRepository.HandlerParamEnum;
4
+import com.xxl.job.core.handler.IJobHandler.JobHandleStatus;
5
+import com.xxl.job.core.log.XxlJobFileAppender;
6
+import com.xxl.job.core.util.HttpUtil;
7
+import org.eclipse.jetty.util.ConcurrentHashSet;
8
+import org.slf4j.Logger;
9
+import org.slf4j.LoggerFactory;
10
+
3
 import java.io.PrintWriter;
11
 import java.io.PrintWriter;
4
 import java.io.StringWriter;
12
 import java.io.StringWriter;
5
 import java.util.HashMap;
13
 import java.util.HashMap;
6
 import java.util.Map;
14
 import java.util.Map;
7
 import java.util.concurrent.LinkedBlockingQueue;
15
 import java.util.concurrent.LinkedBlockingQueue;
8
-import java.util.concurrent.TimeUnit;
9
-
10
-import org.eclipse.jetty.util.ConcurrentHashSet;
11
-import org.slf4j.Logger;
12
-import org.slf4j.LoggerFactory;
13
-
14
-import com.xxl.job.core.handler.HandlerRepository.HandlerParamEnum;
15
-import com.xxl.job.core.handler.IJobHandler.JobHandleStatus;
16
-import com.xxl.job.core.log.XxlJobFileAppender;
17
-import com.xxl.job.core.util.HttpUtil;
18
 
16
 
19
 /**
17
 /**
20
  * handler thread
18
  * handler thread
57
 	public void run() {
55
 	public void run() {
58
 		while(!toStop){
56
 		while(!toStop){
59
 			try {
57
 			try {
60
-				Map<String, String> handlerData = handlerDataQueue.poll();
58
+				Map<String, String> handlerData = handlerDataQueue.take();
61
 				if (handlerData!=null) {
59
 				if (handlerData!=null) {
62
 					i= 0;
60
 					i= 0;
63
 					String log_address = handlerData.get(HandlerParamEnum.LOG_ADDRESS.name());
61
 					String log_address = handlerData.get(HandlerParamEnum.LOG_ADDRESS.name());
103
 						params.put("msg", "人工手动终止[业务运行中,被强制终止]");
101
 						params.put("msg", "人工手动终止[业务运行中,被强制终止]");
104
 						HandlerRepository.pushCallBack(HttpUtil.addressToUrl(log_address), params);
102
 						HandlerRepository.pushCallBack(HttpUtil.addressToUrl(log_address), params);
105
 					}
103
 					}
106
-				} else {
107
-					i++;
108
-					logIdSet.clear();
109
-					try {
110
-						TimeUnit.MILLISECONDS.sleep(i * 100);
111
-					} catch (InterruptedException e) {
112
-						e.printStackTrace();
113
-					}
114
-					if (i>5) {
115
-						i= 0;
116
-					}
117
 				}
104
 				}
118
 			} catch (Exception e) {
105
 			} catch (Exception e) {
119
 				logger.info("HandlerThread Exception:", e);
106
 				logger.info("HandlerThread Exception:", e);