并发编程补充知识之标准线程池
Posted 编程杂货
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了并发编程补充知识之标准线程池相关的知识,希望对你有一定的参考价值。
大家好,我是浮生君。夯实基础,长足进步,和浮生君一起复习不得不说的并发编程补充知识~
今天的内容是StandardThreadExecutorWithNameAndTime,借鉴了tomcat源码的线程池设计,对ThreadFactory、Queue、ThreadPoolExecutor进行了扩展,使得调用层尽量屏蔽底层的代码实现。
一、功能
1、定义线程池名称
2、指定阻塞队列长度
3、限制并发数量
4、参数使用逻辑
ThreadPoolExecutor的判断逻辑前面的内容提到过,如图
StandardThreadExecutorWithNameAndTime:如果线程数超过 corePoolSize,则会增加thread直到 maxThreads,然后才放入阻塞队列中。
二、源码
构造函数
public StandardThreadExecutorWithNameAndTime(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
int taskQueueCapacity, RejectedExecutionHandler handler, String name,Integer semaphoreNumber) {
// 定义信号量,最大为corePoolSize,控制并发量
if(semaphoreNumber != null){
if(semaphoreNumber < corePoolSize) {
semaphoreNumber = corePoolSize;
}
semaphore = new Semaphore(semaphoreNumber);
}
// 定义阻塞队列,底层使用LinkebBlockingQueue
TaskQueue taskqueue = new TaskQueue(taskQueueCapacity==0?maxQueueSize:taskQueueCapacity);
// 定义线程池的名称
TaskThreadFactory tf = new TaskThreadFactory(StringUtils.isBlank(name)?namePrefix:name);
executor = new ThreadExecutorWithNameAndTime(corePoolSize,maximumPoolSize, keepAliveTime, unit,taskqueue, tf,handler) {
protected void afterExecute(Runnable r, Throwable t) {
try {
// todo 执行任务结束后相关操作,比如打印日志等
}catch (Exception e){
throw new RuntimeException("ThreadExecutorWithNameAndTime afterExecute error",e);
}finally {
//释放锁
if(semaphore != null){
semaphore.release();
}
}
}
};
// 定义阻塞队列的parent为当前线程池,当前线程池关闭后,阻塞队列不再接收数据
taskqueue.setParent( (ThreadPoolExecutor) executor);
}
阻塞队列
// 阻塞队列 继承LinkedBlockingQueue
class TaskQueue extends LinkedBlockingQueue<Runnable> {
/**阻塞队列默认长度设置为Integer.MAX_VALUE; */
private static final long serialVersionUID = 8395648427240834739L;
ThreadPoolExecutor parent = null;
public TaskQueue() {
super();
}
public TaskQueue(int initialCapacity) {
super(initialCapacity);
}
public TaskQueue(Collection<? extends Runnable> c) {
super(c);
}
public void setParent(ThreadPoolExecutor tp) {
parent = tp;
}
public boolean force(Runnable o) {
// 线程池关闭之后,拒绝数据再次存入
if ( parent.isShutdown() ) {
throw new RejectedExecutionException("Executor not running, can't force a command into the queue");
}
return super.offer(o); // forces the item onto the queue, to be used if the task is rejected
}
//这部分的实现逻辑是,当线程池的线程数小于最大线程数时,不往阻塞队列放数据
public boolean offer(Runnable o) {
// we can't do any checks
if (parent==null) {
return super.offer(o);
}
// 线程池的线程数已经等于最大线程数,往消息队列中存放数据
int poolSize = parent.getPoolSize();
// we are maxed out on threads, simply queue the object
if (poolSize == parent.getMaximumPoolSize()){
return super.offer(o);
}
// we have idle threads, just add it to the queue
// note that we don't use getActiveCount(), see BZ 49730
AtomicInteger submittedTasksCount = StandardThreadExecutorWithNameAndTime.this.submittedTasksCount;
if(submittedTasksCount!=null) {
if (submittedTasksCount.get()<=poolSize) {
return super.offer(o);
}
}
// 线程池的线程数小于最大线程数,返回false
// if we have less threads than maximum force creation of a new thread
if (poolSize<parent.getMaximumPoolSize()){
return false;
}
// if we reached here, we need to add it to the queue
return super.offer(o);
}
}
线程工厂
class TaskThreadFactory implements ThreadFactory {
final ThreadGroup group;
final AtomicInteger threadNumber = new AtomicInteger(1);
final String namePrefix;
TaskThreadFactory(String namePrefix) {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
this.namePrefix = namePrefix;
}
public Thread newThread(Runnable r) {
// 定义线程池中线程的名称
Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement());
t.setDaemon(daemon);
t.setPriority(getThreadPriority());
return t;
}
}
三、实例
1、线程池结合CountDownLatch,等待所有数据处理完成
private void addData(List<String> data) throws InterruptedException {
// 定义线程池
final StandardThreadExecutorWithNameAndTime taskPool = new StandardThreadExecutorWithNameAndTime
(2, 5, 60L,
TimeUnit.SECONDS, 100000,
new ThreadPoolExecutor.DiscardOldestPolicy(), "addData");
// 定义倒数计数器
CountDownLatch cdl = new CountDownLatch(data.size());
for (int i = 0; i < data.size(); i++) {
final int count = i;
taskPool.submit(() -> {
String print = data.get(count);
try {
// 子线程处理的业务逻辑
System.out.println(print);
} catch (Exception e) {
System.out.println("error:" + e);
} finally {
cdl.countDown();
}
});
}
// 所有的数据处理完成后关闭线程池
cdl.await();
taskPool.shutdown();
}
并发编程所有的内容就到这里啦~
“浮生若梦,为欢几何。” May the force be with you~
以上是关于并发编程补充知识之标准线程池的主要内容,如果未能解决你的问题,请参考以下文章
Python并发编程之线程池/进程池--concurrent.futures模块