Java 并发编程线程池机制 ( 线程池阻塞队列 | 线程池拒绝策略 | 使用 ThreadPoolExecutor 自定义线程池参数 )

Posted 韩曙亮

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java 并发编程线程池机制 ( 线程池阻塞队列 | 线程池拒绝策略 | 使用 ThreadPoolExecutor 自定义线程池参数 )相关的知识,希望对你有一定的参考价值。





一、线程池阻塞队列



线程池阻塞队列是线程池创建的第 5 5 5 个参数 : BlockingQueue<Runnable> workQueue ;

    public ThreadPoolExecutor(int corePoolSize,					// 核心线程数 , 这些线程基本不会被销毁
                              int maximumPoolSize, 				// 最大线程数 , 线程池能创建的最大线程数量
                              long keepAliveTime, 				// 空闲情况下 , 非核心线程存活时间
                              TimeUnit unit, 					// 空闲时间单位
                              BlockingQueue<Runnable> workQueue,// 任务的阻塞队列 ★
                              ThreadFactory threadFactory, 		// 创建线程的工厂类
                              RejectedExecutionHandler handler) // 拒绝策略

线程池阻塞队列 : 线程池中的阻塞队列 , 同一时刻 , 只能有 1 1 1 个线程访问队列 , 执行任务 入队 / 出队 操作 ; 队列都是 FIFO 先进先出 ;

  • 阻塞队列相关概念 :

    • 大小边界 :
      • 有界 : 阻塞队列 大小有限制 , 不是无限大的 ;
      • 无界 : 阻塞队列 理论上无限大 , 比如设置成 Integer.MAX_VALUE ;
    • 队列已满 : 只能出队 , 不能入队 ; 入队操作需阻塞等待 ;
    • 队列为空 : 只能入队 , 不能出队 ; 出队操作需要等待 ;
  • ArrayBlockingQueue : 有界阻塞队列 , 需要 指定阻塞队列大小 ;

  • LinkedBlockingQueue : 无界阻塞队列 , 基于链表的阻塞队列 ;

    • Executors.newCachedThreadPool()Executors.newFixedThreadPool(10) 方法创建的线程池 , 使用的是该阻塞队列 ;
  • SynchronousQueue : 队列 不存储元素 , 后一个 Runnable 任务入队 , 必须等到前一个任务执行完毕才可以 , 否则会一直阻塞等待 ;

    • Executors.newCachedThreadPool() 方法创建的线程池 , 使用的是该阻塞队列 ;
  • PriorityBlockingQueue : 有优先级的阻塞队列 ;


阻塞队列吞吐量 : SynchronousQueue > LinkedBlockingQueue > ArrayBlockingQueue ;





二、拒绝策略



线程池拒绝策略是线程池创建的第 7 7 7 个参数 : RejectedExecutionHandler handler ;

    public ThreadPoolExecutor(int corePoolSize,					// 核心线程数 , 这些线程基本不会被销毁
                              int maximumPoolSize, 				// 最大线程数 , 线程池能创建的最大线程数量
                              long keepAliveTime, 				// 空闲情况下 , 非核心线程存活时间
                              TimeUnit unit, 					// 空闲时间单位
                              BlockingQueue<Runnable> workQueue,// 任务的阻塞队列 
                              ThreadFactory threadFactory, 		// 创建线程的工厂类
                              RejectedExecutionHandler handler) // 拒绝策略 ★

线程池拒绝策略 : 如果核心线程 , 非核心线程都在执行任务 , 阻塞队列是有界的 , 也满了 , 此时线程池如果再添加任务 , 就会触发如下拒绝策略 ;

  • DiscardPolicy : 丢弃任务 ;
  • DiscardOldestPolicy : 丢弃队头的最旧的任务 ;
  • AbortPolicy : 抛出异常 , 这也是默认方式 ;
  • CallerRunsPolicy : 调用者自行处理 ;

线程池默认的拒绝策略是 抛出异常 方式 ;

    private static final RejectedExecutionHandler defaultHandler =
        new AbortPolicy();




三、使用 ThreadPoolExecutor 自定义线程池参数



创建 1 1 1 个线程池 , 核心线程数是 2 2 2 , 最大线程数是 3 3 3 , 则非核心线程 0 ~ 1 个 , 非核心线程最大空闲存活时间 60 秒 , 阻塞队列最大存放 10 个元素 , 拒绝策略设置为抛出异常方式 , 如果阻塞队列装满 , 再次尝试执行新任务时 , 会抛出异常 ;


代码示例 :

import java.util.concurrent.*;

public class Main {

    public static void main(String[] args) {
        ExecutorService executorService = new ThreadPoolExecutor(
                2,                          // 核心线程数 2
                3,                      // 最大线程数 3, 非核心线程 0 ~ 1 个
                60,                        // 非核心线程最大空闲存活时间 60 秒
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(10),   // 阻塞队列, 最大存放 10 个元素
                Executors.defaultThreadFactory(),       // 线程工厂
                new ThreadPoolExecutor.AbortPolicy()    // 决绝策略, 如果执行任务失败, 抛出异常
        );
        for (int i = 0; i < 20; i ++) {
            executorService.execute(new Task(i));
        }
    }

    static class Task implements Runnable {
        /**
         * 记录线程的索引 0 ~ 99
         */
        private int i = 0;

        public Task(int i) {
            this.i = i;
        }

        @Override
        public void run() {
            System.out.println("线程 ID : " + Thread.currentThread().getName() + " , 线程索引 : " + i);

            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

执行结果 : 这里线程最大执行到了 12 12 12 , 也就是从 0 0 0 开始计数 , 执行了 13 13 13 个任务 , 其中 3 3 3 个线程池各自执行一个任务 , 阻塞队列存放 10 10 10 个任务 , 再次尝试将第 14 14 14 个任务放入阻塞队列时 , 报出 java.util.concurrent.RejectedExecutionException 异常 , 但是队列中的 10 10 10 个任务也正常执行完毕 ;

线程 ID : pool-1-thread-2 , 线程索引 : 1
线程 ID : pool-1-thread-3 , 线程索引 : 12
线程 ID : pool-1-thread-1 , 线程索引 : 0
Exception in thread "main" java.util.concurrent.RejectedExecutionException: 
Task Main$Task@5cad8086 rejected from java.util.concurrent.ThreadPoolExecutor@6e0be858
[Running, pool size = 3, active threads = 3, queued tasks = 10, completed tasks = 0]
	at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
	at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
	at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
	at Main.main(Main.java:16)
线程 ID : pool-1-thread-3 , 线程索引 : 2
线程 ID : pool-1-thread-1 , 线程索引 : 4
线程 ID : pool-1-thread-2 , 线程索引 : 3
线程 ID : pool-1-thread-1 , 线程索引 : 5
线程 ID : pool-1-thread-2 , 线程索引 : 7
线程 ID : pool-1-thread-3 , 线程索引 : 6
线程 ID : pool-1-thread-1 , 线程索引 : 9
线程 ID : pool-1-thread-2 , 线程索引 : 8
线程 ID : pool-1-thread-3 , 线程索引 : 10
线程 ID : pool-1-thread-2 , 线程索引 : 11

以上是关于Java 并发编程线程池机制 ( 线程池阻塞队列 | 线程池拒绝策略 | 使用 ThreadPoolExecutor 自定义线程池参数 )的主要内容,如果未能解决你的问题,请参考以下文章

Java 并发编程线程池机制 ( 线程池执行任务细节分析 | 线程池执行 execute 源码分析 | 先创建核心线程 | 再放入阻塞队列 | 最后创建非核心线程 )

并发编程---线程queue---进程池线程池---异部调用(回调机制)

深入浅出Java并发编程指南「源码分析篇」透析ThreadPoolExecutor线程池运作机制和源码体系

JUC并发编程 共享模式之工具 线程池 -- 自定义线程池(阻塞队列)

《Java并发编程实战》----线程池的使用

并发编程 - 线程 - 1.线程queue/2.线程池进程池/3.异步调用与回调机制