如何使用Spring开发和监控线程池服务
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何使用Spring开发和监控线程池服务相关的知识,希望对你有一定的参考价值。
参考技术A 第1步:创建Maven工程下面是一个maven工程。(可以使用Maven或IDE的插件创建)。
第2步:添加依赖库
将Spring的依赖添加到Maven的pom.xml文件中。
1
2
3
4
5
6
7
8
9
10
11
<!-- Spring 3 dependencies -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<version>$spring.version</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>$spring.version</version>
</dependency>
使用下面的插件创建可执行jar包。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>1.3.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.otv.exe.Application</mainClass>
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/spring.handlers</resource>
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/spring.schemas</resource>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
第3步:创建任务类
创建一个实现Runnable接口的新TestTask类。这个类表示要执行的任务。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package com.otv.task;
import org.apache.log4j.Logger;
/**
* @author onlinetechvision.com
* @since 17 Oct 2011
* @version 1.0.0
*
*/
public class TestTask implements Runnable
private static Logger log = Logger.getLogger(TestTask.class);
String taskName;
public TestTask()
public TestTask(String taskName)
this.taskName = taskName;
public void run()
try
log.debug(this.taskName + " : is started.");
Thread.sleep(10000);
log.debug(this.taskName + " : is completed.");
catch (InterruptedException e)
log.error(this.taskName + " : is not completed!");
e.printStackTrace();
@Override
public String toString()
return (getTaskName());
public String getTaskName()
return taskName;
public void setTaskName(String taskName)
this.taskName = taskName;
第4步:创建TestRejectedExecutionHandler类
TestRejectedExecutionHandler类实现了RejectedExecutionHandler接口。如果没有空闲线程并且队列超出限制,任务会被拒绝。这个类处理被拒绝的任务。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package com.otv.handler;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.log4j.Logger;
/**
* @author onlinetechvision.com
* @since 17 Oct 2011
* @version 1.0.0
*
*/
public class TestRejectedExecutionHandler implements RejectedExecutionHandler
private static Logger log = Logger.getLogger(TestRejectedExecutionHandler.class);
public void rejectedExecution(Runnable runnable, ThreadPoolExecutor executor)
log.debug(runnable.toString() + " : has been rejected");
第5步:创建ITestThreadPoolExecutorService接口
创建ITestThreadPoolExecutorService接口。(译者注:这个接口的主要功能是通过设置的参数创建一个线程池)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package com.otv.srv;
import java.util.concurrent.ThreadPoolExecutor;
import com.otv.handler.TestRejectedExecutionHandler;
/**
* @author onlinetechvision.com
* @since 17 Oct 2011
* @version 1.0.0
*
*/
public interface ITestThreadPoolExecutorService
public ThreadPoolExecutor createNewThreadPool();
public int getCorePoolSize();
public void setCorePoolSize(int corePoolSize);
public int getMaxPoolSize();
public void setMaxPoolSize(int maximumPoolSize);
public long getKeepAliveTime();
public void setKeepAliveTime(long keepAliveTime);
public int getQueueCapacity();
public void setQueueCapacity(int queueCapacity);
public TestRejectedExecutionHandler getTestRejectedExecutionHandler();
public void setTestRejectedExecutionHandler(TestRejectedExecutionHandler testRejectedExecutionHandler);
第6步:创建TestThreadPoolExecutorService类
TestThreadPoolExecutorService类实现了ITestThreadPoolExecutorService接口(上一步创建的接口)。这个类可以创建一个新的线程池。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
package com.otv.srv;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import com.otv.handler.TestRejectedExecutionHandler;
/**
* @author onlinetechvision.com
* @since 17 Oct 2011
* @version 1.0.0
*
*/
public class TestThreadPoolExecutorService implements ITestThreadPoolExecutorService
private int corePoolSize;
private int maxPoolSize;
private long keepAliveTime;
private int queueCapacity;
TestRejectedExecutionHandler testRejectedExecutionHandler;
public ThreadPoolExecutor createNewThreadPool()
ThreadPoolExecutor executor = new ThreadPoolExecutor(getCorePoolSize(),
getMaxPoolSize(),
getKeepAliveTime(),
TimeUnit.SECONDS,
new ArrayBlockingQueue(getQueueCapacity()),
getTestRejectedExecutionHandler());
return executor;
public int getCorePoolSize()
return corePoolSize;
public void setCorePoolSize(int corePoolSize)
this.corePoolSize = corePoolSize;
public int getMaxPoolSize()
return maxPoolSize;
public void setMaxPoolSize(int maxPoolSize)
this.maxPoolSize = maxPoolSize;
public long getKeepAliveTime()
return keepAliveTime;
public void setKeepAliveTime(long keepAliveTime)
this.keepAliveTime = keepAliveTime;
public int getQueueCapacity()
return queueCapacity;
public void setQueueCapacity(int queueCapacity)
this.queueCapacity = queueCapacity;
public TestRejectedExecutionHandler getTestRejectedExecutionHandler()
return testRejectedExecutionHandler;
public void setTestRejectedExecutionHandler(TestRejectedExecutionHandler testRejectedExecutionHandler)
this.testRejectedExecutionHandler = testRejectedExecutionHandler;
第7步: 创建IThreadPoolMonitorService接口
创建IThreadPoolMonitorService接口
1
2
3
4
5
6
7
8
9
10
11
12
package com.otv.monitor.srv;
import java.util.concurrent.ThreadPoolExecutor;
public interface IThreadPoolMonitorService extends Runnable
public void monitorThreadPool();
public ThreadPoolExecutor getExecutor();
public void setExecutor(ThreadPoolExecutor executor);
第8步:创建ThreadPoolMonitorService类
ThreadPoolMonitorService类实现了IThreadPoolMonitorService接口。这个类用来监控已创建的线程池。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package com.otv.monitor.srv;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.log4j.Logger;
/**
* @author onlinetechvision.com
* @since 17 Oct 2011
* @version 1.0.0
*
*/
public class ThreadPoolMonitorService implements IThreadPoolMonitorService
private static Logger log = Logger.getLogger(ThreadPoolMonitorService.class);
ThreadPoolExecutor executor;
private long monitoringPeriod;
public void run()
try
while (true)
monitorThreadPool();
Thread.sleep(monitoringPeriod*1000);
catch (Exception e)
log.error(e.getMessage());
public void monitorThreadPool()
StringBuffer strBuff = new StringBuffer();
strBuff.append("CurrentPoolSize : ").append(executor.getPoolSize());
strBuff.append(" - CorePoolSize : ").append(executor.getCorePoolSize());
strBuff.append(" - MaximumPoolSize : ").append(executor.getMaximumPoolSize());
strBuff.append(" - ActiveTaskCount : ").append(executor.getActiveCount());
strBuff.append(" - CompletedTaskCount : ").append(executor.getCompletedTaskCount());
strBuff.append(" - TotalTaskCount : ").append(executor.getTaskCount());
strBuff.append(" - isTerminated : ").append(executor.isTerminated());
log.debug(strBuff.toString());
public ThreadPoolExecutor getExecutor()
return executor;
public void setExecutor(ThreadPoolExecutor executor)
this.executor = executor;
public long getMonitoringPeriod()
return monitoringPeriod;
public void setMonitoringPeriod(long monitoringPeriod)
this.monitoringPeriod = monitoringPeriod;
第9步:创建Starter类
(译者注:这个类内部维护了一个线程池服务(testThreadPoolExecutorService)和一个监控服务(threadPoolMonitorService),然后创建线程池、启动一个单独的线程执行监控服务、通过线程池执行任务)本回答被提问者采纳
如何使用线程池?
需求:
我们之前实现了一个多线程web视频监控服务器,由于我们服务器资源有限(CPU、内存、带宽),需要对请求连接数(线程数)做限制,避免因资源耗尽而瘫痪
可以使用线程池代替原来的每次请求创建线程
思路:
使用标准库中concurrent.futures下的ThreadPoolExecutor,对象的submit和map方法可以启动线程池中的线程来执行任务
代码:
import os, cv2, time, struct, threading
from http.server import HTTPServer, BaseHTTPRequestHandler
from socketserver import TCPServer, ThreadingTCPServer
from threading import Thread, RLock
from select import select
class JpegStreamer(Thread):
def __init__(self, camera):
super().__init__()
self.cap = cv2.VideoCapture(camera)
self.lock = RLock()
self.pipes = {}
def register(self):
pr, pw = os.pipe()
self.lock.acquire()
self.pipes[pr] = pw
self.lock.release()
return pr
def unregister(self, pr):
self.lock.acquire()
pw = self.pipes.pop(pr)
self.lock.release()
os.close(pr)
os.close(pw)
def capture(self):
cap = self.cap
while cap.isOpened():
ret, frame = cap.read()
if ret:
ret, data = cv2.imencode(‘.jpg‘, frame, (cv2.IMWRITE_JPEG_QUALITY, 40))
yield data.tostring()
def send_frame(self, frame):
n = struct.pack(‘l‘, len(frame))
self.lock.acquire()
if len(self.pipes):
_, pipes, _ = select([], self.pipes.values(), [], 1)
for pipe in pipes:
os.write(pipe, n)
os.write(pipe, frame)
self.lock.release()
def run(self):
for frame in self.capture():
self.send_frame(frame)
class JpegRetriever:
def __init__(self, streamer):
self.streamer = streamer
self.local = threading.local()
def retrieve(self):
while True:
ns = os.read(self.local.pipe, 8)
n = struct.unpack(‘l‘, ns)[0]
data = os.read(self.local.pipe, n)
yield data
def __enter__(self):
if hasattr(self.local, ‘pipe‘):
raise RuntimeError()
self.local.pipe = streamer.register()
return self.retrieve()
def __exit__(self, *args):
self.streamer.unregister(self.local.pipe)
del self.local.pipe
return True
class WebHandler(BaseHTTPRequestHandler):
retriever = None
@staticmethod
def set_retriever(retriever):
WebHandler.retriever = retriever
def do_GET(self):
if self.retriever is None:
raise RuntimeError(‘no retriver‘)
if self.path != ‘/‘:
return
self.send_response(200)
self.send_header(‘Content-type‘, ‘multipart/x-mixed-replace;boundary=jpeg_frame‘)
self.end_headers()
with self.retriever as frames:
for frame in frames:
self.send_frame(frame)
def send_frame(self, frame):
sh = b‘--jpeg_frame
‘
sh += b‘Content-Type: image/jpeg
‘
sh += b‘Content-Length: %d
‘ % len(frame)
self.wfile.write(sh)
self.wfile.write(frame)
from concurrent.futures import ThreadPoolExecutor
class ThreadingPoolTCPServer(ThreadingTCPServer):
def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True, thread_n=100):
super().__init__(server_address, RequestHandlerClass, bind_and_activate=True)
self.executor = ThreadPoolExecutor(thread_n)
def process_request(self, request, client_address):
self.executor.submit(self.process_request_thread, request, client_address)
if __name__ == ‘__main__‘:
# 创建Streamer,开启摄像头采集。
streamer = JpegStreamer(0)
streamer.start()
# http服务创建Retriever
retriever = JpegRetriever(streamer)
WebHandler.set_retriever(retriever)
# 开启http服务器
HOST = ‘localhost‘
PORT = 9000
print(‘Start server... (http://%s:%s)‘ % (HOST, PORT))
httpd = ThreadingPoolTCPServer((HOST, PORT), WebHandler, thread_n=3)
#httpd = ThreadingTCPServer((HOST, PORT), WebHandler)
httpd.serve_forever()
=====================================================================
>>> import threading,time,random
>>> def f(a,b):
... print(threading.current_thread().name,‘:‘,a,b)
... time.sleep(random.randint(5,10))
... return a * b
...
>>> from concurrent.futures import ThreadPoolExecutor
>>> executor = ThreadPoolExecutor(3)
>>> executor.submit(f,2,3)
ThreadPoolExecutor-0_0 : 2 3
<Future at 0x7f17562032b0 state=running>
>>> future = executor.submit(f,2,3)
ThreadPoolExecutor-0_1 : 2 3
>>> future.result()
6
>>> executor.map(f, range(1,6),range(2,7))
ThreadPoolExecutor-0_2 : 1 2
<generator object Executor.map.<locals>.result_iterator at 0x7f17568aa830>
ThreadPoolExecutor-0_0 : 2 3
ThreadPoolExecutor-0_1 : 3 4
ThreadPoolExecutor-0_1 : 4 5
ThreadPoolExecutor-0_2 : 5 6
>>> list(executor.map(f, range(1,6),range(2,7)))
ThreadPoolExecutor-0_2 : 1 2
ThreadPoolExecutor-0_1 : 2 3
ThreadPoolExecutor-0_0 : 3 4
ThreadPoolExecutor-0_2 : 4 5
ThreadPoolExecutor-0_1 : 5 6
[2, 6, 12, 20, 30]
>>>
以上是关于如何使用Spring开发和监控线程池服务的主要内容,如果未能解决你的问题,请参考以下文章