并发编程补充知识之标准线程池

Posted 编程杂货

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了并发编程补充知识之标准线程池相关的知识,希望对你有一定的参考价值。

大家好,我是浮生君。夯实基础,长足进步,和浮生君一起复习不得不说的并发编程补充知识~


今天的内容是StandardThreadExecutorWithNameAndTime,借鉴了tomcat源码的线程池设计,对ThreadFactory、Queue、ThreadPoolExecutor进行了扩展,使得调用层尽量屏蔽底层的代码实现。


一、功能

1、定义线程池名称

2、指定阻塞队列长度

3、限制并发数量

4、参数使用逻辑

  • ThreadPoolExecutor的判断逻辑前面的内容提到过,如图

  • StandardThreadExecutorWithNameAndTime:如果线程数超过 corePoolSize,则会增加thread直到 maxThreads,然后才放入阻塞队列中。


二、源码

  • 构造函数

public StandardThreadExecutorWithNameAndTime(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,                                                int taskQueueCapacity, RejectedExecutionHandler handler, String name,Integer semaphoreNumber) {        // 定义信号量,最大为corePoolSize,控制并发量 if(semaphoreNumber != null){ if(semaphoreNumber < corePoolSize) { semaphoreNumber = corePoolSize; } semaphore = new Semaphore(semaphoreNumber);        }        // 定义阻塞队列,底层使用LinkebBlockingQueue TaskQueue taskqueue = new TaskQueue(taskQueueCapacity==0?maxQueueSize:taskQueueCapacity);        // 定义线程池的名称 TaskThreadFactory tf = new TaskThreadFactory(StringUtils.isBlank(name)?namePrefix:name); executor = new ThreadExecutorWithNameAndTime(corePoolSize,maximumPoolSize, keepAliveTime, unit,taskqueue, tf,handler) { @Override protected void afterExecute(Runnable r, Throwable t) { try {                    // todo 执行任务结束后相关操作,比如打印日志等 }catch (Exception e){ throw new RuntimeException("ThreadExecutorWithNameAndTime afterExecute error",e); }finally { //释放锁 if(semaphore != null){ semaphore.release(); } } } };        // 定义阻塞队列的parent为当前线程池,当前线程池关闭后,阻塞队列不再接收数据 taskqueue.setParent( (ThreadPoolExecutor) executor);    }
  • 阻塞队列

// 阻塞队列 继承LinkedBlockingQueueclass TaskQueue extends LinkedBlockingQueue<Runnable> { /**阻塞队列默认长度设置为Integer.MAX_VALUE; */ private static final long serialVersionUID = 8395648427240834739L; ThreadPoolExecutor parent = null; public TaskQueue() { super(); } public TaskQueue(int initialCapacity) { super(initialCapacity); } public TaskQueue(Collection<? extends Runnable> c) { super(c); } public void setParent(ThreadPoolExecutor tp) { parent = tp; } public boolean force(Runnable o) { // 线程池关闭之后,拒绝数据再次存入 if ( parent.isShutdown() ) { throw new RejectedExecutionException("Executor not running, can't force a command into the queue"); } return super.offer(o); // forces the item onto the queue, to be used if the task is rejected } //这部分的实现逻辑是,当线程池的线程数小于最大线程数时,不往阻塞队列放数据 @Override public boolean offer(Runnable o) { // we can't do any checks if (parent==null) { return super.offer(o); } // 线程池的线程数已经等于最大线程数,往消息队列中存放数据 int poolSize = parent.getPoolSize(); // we are maxed out on threads, simply queue the object if (poolSize == parent.getMaximumPoolSize()){ return super.offer(o); } // we have idle threads, just add it to the queue // note that we don't use getActiveCount(), see BZ 49730 AtomicInteger submittedTasksCount = StandardThreadExecutorWithNameAndTime.this.submittedTasksCount; if(submittedTasksCount!=null) { if (submittedTasksCount.get()<=poolSize) { return super.offer(o); } }  // 线程池的线程数小于最大线程数,返回false // if we have less threads than maximum force creation of a new thread if (poolSize<parent.getMaximumPoolSize()){ return false; } // if we reached here, we need to add it to the queue return super.offer(o); } }
  • 线程工厂

class TaskThreadFactory implements ThreadFactory { final ThreadGroup group; final AtomicInteger threadNumber = new AtomicInteger(1); final String namePrefix;
TaskThreadFactory(String namePrefix) { SecurityManager s = System.getSecurityManager(); group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); this.namePrefix = namePrefix; }
@Override public Thread newThread(Runnable r) {            //  定义线程池中线程的名称 Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement()); t.setDaemon(daemon); t.setPriority(getThreadPriority()); return t; } }


三、实例

1、线程池结合CountDownLatch,等待所有数据处理完成

private void addData(List<String> data) throws InterruptedException { // 定义线程池 final StandardThreadExecutorWithNameAndTime taskPool = new StandardThreadExecutorWithNameAndTime (2, 5, 60L, TimeUnit.SECONDS, 100000, new ThreadPoolExecutor.DiscardOldestPolicy(), "addData"); // 定义倒数计数器 CountDownLatch cdl = new CountDownLatch(data.size()); for (int i = 0; i < data.size(); i++) { final int count = i; taskPool.submit(() -> { String print = data.get(count); try { // 子线程处理的业务逻辑 System.out.println(print); } catch (Exception e) { System.out.println("error:" + e); } finally { cdl.countDown(); } }); } // 所有的数据处理完成后关闭线程池 cdl.await(); taskPool.shutdown(); }

并发编程所有的内容就到这里啦~

“浮生若梦,为欢几何。” May the force be with you~

以上是关于并发编程补充知识之标准线程池的主要内容,如果未能解决你的问题,请参考以下文章

Python并发编程之线程池/进程池--concurrent.futures模块

并发编程系列之如何正确使用线程池?

python并发编程之进程池,线程池

2020Python修炼记python并发编程补充—进程池和线程池

Java面试系列之并发编程专题-Java线程池灵魂拷问

Java面试系列之并发编程专题-Java线程池灵魂拷问