通过自定义阻塞队列触发ThreadPoolExecutor创建非核心线程执行任务
Posted Elon.Yang
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了通过自定义阻塞队列触发ThreadPoolExecutor创建非核心线程执行任务相关的知识,希望对你有一定的参考价值。
Java项目常使用ThreadPoolExecutor创建线程池,核心参数包含corePoolSize,maximumPoolSize,workQueue。我们希望构建的线程池能满足如下条件:
- 线程数量可控。需要设置一个最大线程数量maximumPoolSize,防止线程无限制创建,耗尽系统资源。
- 放到线程池中的任务不会被拒绝丢弃(任务被丢弃,将导致严重的业务BUG)。所以一般定义一个无界阻塞队列(不指定大小,容量最大值是Integer.MAX_VALUE),用于缓存待执行的任务。
无界队列导致maximumPoolSize无效
ThreadPoolExecutor的execute() 方法逻辑如下:新增一个任务先判断核心线程是否有空闲,核心线程有空闲则交给核心线程执行;没有空闲的核心线程则将任务放入阻塞队列。
1. 任务执行三步骤
public void execute(Runnable command)
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 1. 有空闲核心线程,则交给核心线程执行
if (workerCountOf(c) < corePoolSize)
if (addWorker(command, true))
return;
c = ctl.get();
// 2. 将任务加到队列,等待空闲线程消费
if (isRunning(c) && workQueue.offer(command))
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
// 3. 尝试创建非核心线程执行任务
else if (!addWorker(command, false))
reject(command);
2. 无界队列workQueue.offer(command)永远返回true
以LinkedBlockingDeque来看一下其offer的执行过程:
- ThreadPoolExecutor.execute()调用LinkedBlockingDeque.offer()。
- LinkedBlockingDeque.offer()调用LinkedBlockingDeque.offerLast()。
- LinkedBlockingDeque.offerLast()调用LinkedBlockingDeque.linkLast()。
private boolean linkLast(Node<E> node)
// 超出队列容量时,添加失败。返回false。
if (count >= capacity)
return false;
Node<E> l = last;
node.prev = l;
last = node;
if (first == null)
first = node;
else
l.next = node;
++count;
notEmpty.signal();
return true;
对于无界队列需要满足count >= Integer.MAX_VALUE才会返回false,触发创建非核心线程。这个条件基本上无法达到,所以线程池表现出来的就是只有核心线程在工作。
自定义阻塞队列重写LinkedBlockingDeque的方法触发创建非核心线程
考虑:从调用链来看,我们可以重写linkLast方法,修改if (count >= capacity)判断条件。但linkLast()是私有方法,子类无法重写;更上层的offerLast()是public方法,但依赖内部类Node。再往上看offer(), offer()直接调用了offerLast()方法,没有依赖LinkedBlockingDequene内部定义的私有模型,符合改写条件。
将自定义的ThreadTaskLinkedBlockingQueue作为线程池工具的私有内部类定义,对外隐藏实现细节。
package elon.threadpool.util;
import lombok.Setter;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* 线程池工具类。
*
* @author elon
* @since 2021/11/6
*/
public class ElonThreadPoolUtils
private static final Logger LOGGER = LogManager.getLogger(ElonThreadPoolUtils.class);
private static int corePoolSize = 10;
private static int maximumPoolSize = 100;
private static ThreadPoolExecutor poolExecutor = null;
private static ThreadTaskLinkedBlockingQueue<Runnable> queue = new ThreadTaskLinkedBlockingQueue<>();
public static void initThreadPool(int corePoolSize, int maximumPoolSize)
ElonThreadPoolUtils.corePoolSize = corePoolSize;
ElonThreadPoolUtils.maximumPoolSize = maximumPoolSize;
poolExecutor = new ThreadPoolExecutor(ElonThreadPoolUtils.corePoolSize, ElonThreadPoolUtils.maximumPoolSize, 10,
TimeUnit.SECONDS, queue);
LOGGER.info("[ElonThreadPoolUtils]Init thread pool success. corePoolSize:|maximumPoolSize:", corePoolSize,
maximumPoolSize);
synchronized public static void executeTask(Runnable task)
int activeThreadNum = poolExecutor.getActiveCount();
LOGGER.info("[ElonThreadPoolUtils]Number of active threads:", activeThreadNum);
LOGGER.info("[ElonThreadPoolUtils]The number of tasks waiting to be processed in the queue:", queue.size());
poolExecutor.execute(task);
/**
* 自定义线程任务阻塞队列. 在活跃线程数小于最大支持线程数的情况下,新任务不放到队列从而激发线程池创建新线程及时处理.
* 解决使用LinkedBlockingDeque无限队列,线程池只有核心线程在处理。maximumPoolSize未启作用的问题。
*
* @author elon
* @since 2021/11/6
*/
@Setter
private static class ThreadTaskLinkedBlockingQueue<E> extends LinkedBlockingDeque<E>
@Override
public boolean offer(E e)
int activeThreadNum = poolExecutor.getActiveCount();
if (activeThreadNum < maximumPoolSize)
return false;
return offerLast(e);
关键逻辑是改写了offer()的实现,在调用offerLast()前增加了判断:如果线程池活跃线程数量小于最大线程数,新增任务直接返回false;不放到队列中。从而触发线程池创建非核心线程执行任务。
如果活跃线程数等于最大线程数,任务将放到队列中等待空闲线程来消费。
测试代码
- 定义测试用的线程任务类
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* 线程任务
*
* @author elon
* @since 2021/11/6
*/
public class ThreadTask implements Runnable
private static final Logger LOGGER = LogManager.getLogger(ThreadTask.class);
private final String threadName;
public ThreadTask(String threadName)
this.threadName = threadName;
@Override
public void run()
LOGGER.info("threadName:", threadName);
try
Thread.sleep(10 * 1000);
catch (InterruptedException e)
e.printStackTrace();
- 创建线程池,执行1000个任务
import elon.threadpool.service.ThreadTask;
import elon.threadpool.util.ElonThreadPoolUtils;
/**
* 应用启动类
*
* @author elon
* @since 2021/11/6
*/
public class StartupThreadPool
public static void main(String[] args)
ElonThreadPoolUtils.initThreadPool(10, 100);
for (int i = 1; i <= 1000; ++i)
ElonThreadPoolUtils.executeTask(new ThreadTask(String.valueOf(i)));
以上是关于通过自定义阻塞队列触发ThreadPoolExecutor创建非核心线程执行任务的主要内容,如果未能解决你的问题,请参考以下文章
通过自定义阻塞队列触发ThreadPoolExecutor创建非核心线程执行任务
Java 并发编程线程池机制 ( 线程池阻塞队列 | 线程池拒绝策略 | 使用 ThreadPoolExecutor 自定义线程池参数 )
JUC并发编程 共享模式之工具 线程池 -- 自定义线程池(阻塞队列)