通过自定义阻塞队列触发ThreadPoolExecutor创建非核心线程执行任务

Posted Neo Yang

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了通过自定义阻塞队列触发ThreadPoolExecutor创建非核心线程执行任务相关的知识,希望对你有一定的参考价值。

Java项目常使用ThreadPoolExecutor创建线程池,核心参数包含corePoolSize,maximumPoolSize,workQueue。我们希望构建的线程池能满足如下条件:

  1. 线程数量可控。需要设置一个最大线程数量maximumPoolSize,防止线程无限制创建,耗尽系统资源。
  2. 放到线程池中的任务不会被拒绝丢弃(任务被丢弃,将导致严重的业务BUG)。所以一般定义一个无界阻塞队列(不指定大小,容量最大值是Integer.MAX_VALUE),用于缓存待执行的任务。

无界队列导致maximumPoolSize无效

ThreadPoolExecutor的execute() 方法逻辑如下:新增一个任务先判断核心线程是否有空闲,核心线程有空闲则交给核心线程执行;没有空闲的核心线程则将任务放入阻塞队列。

1. 任务执行三步骤

    public void execute(Runnable command) 
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
		
		// 1. 有空闲核心线程,则交给核心线程执行
        if (workerCountOf(c) < corePoolSize) 
            if (addWorker(command, true))
                return;
            c = ctl.get();
        
		
		// 2. 将任务加到队列,等待空闲线程消费
        if (isRunning(c) && workQueue.offer(command)) 
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        
        
        // 3. 尝试创建非核心线程执行任务
        else if (!addWorker(command, false))
            reject(command);
    

2. 无界队列workQueue.offer(command)永远返回true

以LinkedBlockingDeque来看一下其offer的执行过程:

  1. ThreadPoolExecutor.execute()调用LinkedBlockingDeque.offer()。
  2. LinkedBlockingDeque.offer()调用LinkedBlockingDeque.offerLast()。
  3. LinkedBlockingDeque.offerLast()调用LinkedBlockingDeque.linkLast()。
    private boolean linkLast(Node<E> node) 
        // 超出队列容量时,添加失败。返回false。
        if (count >= capacity)
            return false;
        Node<E> l = last;
        node.prev = l;
        last = node;
        if (first == null)
            first = node;
        else
            l.next = node;
        ++count;
        notEmpty.signal();
        return true;
    

对于无界队列需要满足count >= Integer.MAX_VALUE才会返回false,触发创建非核心线程。这个条件基本上无法达到,所以线程池表现出来的就是只有核心线程在工作。

自定义阻塞队列重写LinkedBlockingDeque的方法触发创建非核心线程

考虑:从调用链来看,我们可以重写linkLast方法,修改if (count >= capacity)判断条件。但linkLast()是私有方法,子类无法重写;更上层的offerLast()是public方法,但依赖内部类Node。再往上看offer(), offer()直接调用了offerLast()方法,没有依赖LinkedBlockingDequene内部定义的私有模型,符合改写条件。

将自定义的ThreadTaskLinkedBlockingQueue作为线程池工具的私有内部类定义,对外隐藏实现细节。

package elon.threadpool.util;

import lombok.Setter;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * 线程池工具类。
 *
 * @author elon
 * @since 2021/11/6
 */
public class ElonThreadPoolUtils 
    private static final Logger LOGGER = LogManager.getLogger(ElonThreadPoolUtils.class);

    private static int corePoolSize = 10;

    private static int maximumPoolSize = 100;

    private static ThreadPoolExecutor poolExecutor = null;

    private static ThreadTaskLinkedBlockingDeque<Runnable> queue = new ThreadTaskLinkedBlockingDeque<>();

    public static void initThreadPool(int corePoolSize, int maximumPoolSize)
        ElonThreadPoolUtils.corePoolSize = corePoolSize;
        ElonThreadPoolUtils.maximumPoolSize = maximumPoolSize;

        poolExecutor = new ThreadPoolExecutor(ElonThreadPoolUtils.corePoolSize, ElonThreadPoolUtils.maximumPoolSize, 10,
                TimeUnit.SECONDS, queue);

        LOGGER.info("[ElonThreadPoolUtils]Init thread pool success. corePoolSize:|maximumPoolSize:", corePoolSize,
                maximumPoolSize);
    

    synchronized public static void executeTask(Runnable task)
        int activeThreadNum = poolExecutor.getActiveCount();
        LOGGER.info("[ElonThreadPoolUtils]Number of active threads:", activeThreadNum);
        LOGGER.info("[ElonThreadPoolUtils]The number of tasks waiting to be processed in the queue:", queue.size());

        poolExecutor.execute(task);
    

    /**
     * 自定义线程任务阻塞队列. 在活跃线程数小于最大支持线程数的情况下,新任务不放到队列从而激发线程池创建新线程及时处理.
     * 解决使用LinkedBlockingDeque无限队列,线程池只有核心线程在处理。maximumPoolSize未启作用的问题。
     *
     * @author elon
     * @since 2021/11/6
     */
    @Setter
    private static class ThreadTaskLinkedBlockingDeque<E> extends LinkedBlockingDeque<E> 
        @Override
        public boolean offer(E e) 
            int activeThreadNum = poolExecutor.getActiveCount();
            if (activeThreadNum < maximumPoolSize) 
                return false;
            

            return offerLast(e);
        
    


关键逻辑是改写了offer()的实现,在调用offerLast()前增加了判断:如果线程池活跃线程数量小于最大线程数,新增任务直接返回false;不放到队列中。从而触发线程池创建非核心线程执行任务。

如果活跃线程数等于最大线程数,任务将放到队列中等待空闲线程来消费。

测试代码

  1. 定义测试用的线程任务类
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/**
 * 线程任务
 *
 * @author elon
 * @since 2021/11/6
 */
public class ThreadTask implements Runnable 
    private static final Logger LOGGER = LogManager.getLogger(ThreadTask.class);

    private final String threadName;

    public ThreadTask(String threadName) 
        this.threadName = threadName;
    

    @Override
    public void run() 
        LOGGER.info("threadName:", threadName);
        try 
            Thread.sleep(10 * 1000);
         catch (InterruptedException e) 
            e.printStackTrace();
        
    

  1. 创建线程池,执行1000个任务
import elon.threadpool.service.ThreadTask;
import elon.threadpool.util.ElonThreadPoolUtils;

/**
 * 应用启动类
 *
 * @author elon
 * @since 2021/11/6
 */
public class StartupThreadPool 
    public static void main(String[] args) 
        ElonThreadPoolUtils.initThreadPool(10, 100);
        for (int i = 1; i <= 1000; ++i) 
            ElonThreadPoolUtils.executeTask(new ThreadTask(String.valueOf(i)));
        
    

从控制台打印的日志可以看到,有100个线程在并发执行。

源码地址:https://github.com/ylforever/elon-threadpool

以上是关于通过自定义阻塞队列触发ThreadPoolExecutor创建非核心线程执行任务的主要内容,如果未能解决你的问题,请参考以下文章

通过自定义阻塞队列触发ThreadPoolExecutor创建非核心线程执行任务

Java 并发编程线程池机制 ( 线程池阻塞队列 | 线程池拒绝策略 | 使用 ThreadPoolExecutor 自定义线程池参数 )

JUC并发编程 共享模式之工具 线程池 -- 自定义线程池(阻塞队列)

生产者消费者模式

如何使用自定义 INameResolver 配置由服务总线触发的 Azure 函数?

第43课 发送自定义事件(上)