并发7-线程池

Posted gnwzj

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了并发7-线程池相关的知识,希望对你有一定的参考价值。

用优雅的方式理解和使用线程池

  1. 线程池的目的
    (1)减少系统维护线程的开销
    (2)解耦,运行和创建分开
    (3)线程可以复用

  2. 线程池的使用
    (1)接口Executor  提供方法execute(Runnable)执行线程命令
    (2)接口ExecutorService 提供方法shutdown()  启动一次顺序关闭,执行以前提交的任务,但不接受新任务
    技术图片

     


    Future(框架):交给我一个任务,我给你一个发票,到时候用发票取结果。
    Executors

    此包中所定义的 ExecutorExecutorServiceScheduledExecutorServiceThreadFactory 和 Callable 类的工厂和实用方法。此类支持以下各种方法:

    • 创建并返回设置有常用配置字符串的 ExecutorService 的方法。
    • 创建并返回设置有常用配置字符串的 ScheduledExecutorService 的方法。
    • 创建并返回“包装的”ExecutorService 方法,它通过使特定于实现的方法不可访问来禁用重新配置。
    • 创建并返回 ThreadFactory 的方法,它可将新创建的线程设置为已知的状态。
    • 创建并返回非闭包形式的 Callable 的方法,这样可将其用于需要 Callable 的执行方法中。

    (3)线程池的分类
    newFixedThreadPool(int nThreads) 
              创建一个可重用固定线程数的线程池,以共享的无界队列方式来运行这些线程。
    技术图片

     


    newCachedThreadPool() 
              创建一个可根据需要创建新线程的线程池,但是在以前构造的线程可用时将重用它们。(可变大小的线程池)
    技术图片

     


    newSingleThreadExecutor() 
              创建一个使用单个 worker 线程的 Executor,以无界队列方式来运行该线程。
    技术图片

     


    newScheduledThreadPool(int corePoolSize) 
              创建一个线程池,它可安排在给定延迟后运行命令或者定期地执行。
    技术图片

     



    (4)定义

    (5)使用

  3. 线程池的原理

    public static ExecutorService newCachedThreadPool() {
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>());
        }
    

      

    public static ExecutorService newSingleThreadExecutor() {
            return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>()));
        }
    

      

    public static ExecutorService newFixedThreadPool(int nThreads) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>());
        }
    

      

    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
            return new ScheduledThreadPoolExecutor(corePoolSize);
        }
    

      链接

    public ScheduledThreadPoolExecutor(int corePoolSize) {
            super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
                  new DelayedWorkQueue());
        }
    

      链接

    public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue) {
            this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                 Executors.defaultThreadFactory(), defaultHandler);
        }
    

      

    核心参数
    int corePoolSize 核心线程池的大小
    int maximumPoolSize 最大线程数量
    long keepAliveTime 线程保持活动的时间 为什么这里要有保持活动时间?因为存在核心线程池,与线程池同生命周期(同生共死),非核心也就是最大线程数量超过核心线程数量的线程,会存在生命周期。
    TimeUnit unit 线程保持活动的时间单位
    BlockingQueue<Runnable> workQueue 任务队列(BlockingQueue:阻塞队列)
    defaultHandler 拒绝策略

    线程池状态:
    private static final int COUNT_BITS = Integer.SIZE - 3;  线程数量
    private static final int CAPACITY = (1 << COUNT_BITS) - 1; 最多容量
    private static final int RUNNING = -1 << COUNT_BITS;
    private static final int SHUTDOWN = 0 << COUNT_BITS;
    private static final int STOP = 1 << COUNT_BITS;
    private static final int TIDYING = 2 << COUNT_BITS;
    private static final int TERMINATED = 3 << COUNT_BITS;

    状态变化
    技术图片
    Execute方法
    int c = ctl.get();//获取当前线程池的状态
            if (workerCountOf(c) < corePoolSize) {//当前线程数量小于 coreSize 时创建一个新的线程运行
                if (addWorker(command, true))
                    return;
                c = ctl.get();
            }
            if (isRunning(c) && workQueue.offer(command)) {//如果当前线程处于运行状态,并且写入阻塞队列成功
                int recheck = ctl.get();
                if (! isRunning(recheck) && remove(command))  //双重检查,再次获取线程状态;如果线程状态变了(非运行状态)就需要从阻塞队列移除任务,并尝试判断线程是否全部执行完毕。同时执行拒绝策略。
                    reject(command);
                else if (workerCountOf(recheck) == 0)  //如果当前线程池为空就新创建一个线程并执行。
                    addWorker(null, false);
            }
            else if (!addWorker(command, false)) //如果在第三步的判断为非运行状态,尝试新建线程,如果失败则执行拒绝策略
                reject(command);
    }
    

      


    线程池的关闭
    pool.shutdown();
    while(!pool.isTerminated()){

    }


以上是关于并发7-线程池的主要内容,如果未能解决你的问题,请参考以下文章

4种线程池和7种并发队列

newCacheThreadPool()newFixedThreadPool()newScheduledThreadPool()newSingleThreadExecutor()自定义线程池(代码片段

Python并发编程05/ 死锁/递归锁/信号量/GIL锁/进程池/线程池

JUC并发编程 共享模式之工具 线程池 -- Fork / Join 框架(JDK1.7 新加入的线程池实现)

Java并发:线程池的使用

高并发学习 —— 集合线程安全线程池