28.线程池

Posted 纵横千里,捭阖四方

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了28.线程池相关的知识,希望对你有一定的参考价值。

1 池化技术介绍

我们可以通过继承Thread类或者实现Runnable接口来创建线程,但是生产环境中最好不要这么做,因为可能会创建大量线程 ,而线程本身也是要消耗大量资源的,因此效率反而降低了。

在工程中,我们经常会使用池化技术来管理某些资源访问,其原理就是先创建好一定量的资源,在使用的时候直接从创建好的资源中拿一个来用,用完再还回去。这种方式特别适用于文件IO、数据库连接、redis、消息队列、网络等需要与外界打交道的场景中。提前创建好,一来可以加快服务调用,二来可以统一管理这些资源。

在JUC中,专门提供了与线程池相关的API,我们可以通过两种方式来创建线程池:

ThreadPoolExecutor:线程池的具体实现类。

Executors,提供了一系列工厂方法,用来创建不同类型的线程池,返回的线程池类型为ExecutorService接口。

Executors工厂类最终创建线程的仍然是ThreadPoolExecutor类,Executors的关键价值是对线层的使用了一定的封装。Executors提供的创建线程池的方法有:

  • newFixedThreadPool(int nThreads),创建一个有固定数量的线程池。

  • newWorkStealingPool(),创建一个Fork/Join的线程池。

  • newSingleThreadExecutor(),创建只有一个线程的线程池,也就是这个线程池只有一个核心线程。

  • newCachedThreadPool(),创建一个可以缓存线程的线程池,特征是不限制线程的数量,根据任务数量产生对应数量的线程,并且将其缓存起来,可以重复使用,知道任务数量降低之后被释放。

  • newScheduledThreadPool(),创建一个有固定线程数量的线程池,并且允许延期执行,以及按照周期反复执行,类似定时调度机制。

我们先以newFixedThreadPool()为例,看一下如何使用线程池。

public class ThreadPoolExample

    public static void main( String[] args )
        ExecutorService executorService= Executors.newFixedThreadPool(4);
        for (int i = 0; i < 10; i++) 
            //把一个实现了Runnable接口的任务给到线程池执行
            executorService.execute(new Task());
        
        executorService.shutdown(); //关闭线程池
    

    static class Task implements Runnable
        @Override
        public void run() 
            System.out.println(Thread.currentThread().getName()+" - 开始执行任务");
            try 
                Thread.sleep(new Random().nextInt(1000));
             catch (InterruptedException e) 
                e.printStackTrace();
            
            System.out.println(Thread.currentThread().getName()+" - 执行完成");
        
    

上述代码创建了一个数量为4的线程池,接着通过execute()方法传递一个Task去执行,这个Task必须实现Runnable接口。在执行时,线程池最终会调用Task类中的run()方法。

2 ThreadPoolExecutor

前面提到的5种线程池,除了newWorkStealingPool()方法,其他都是基于ThreadPoolExecutor类来构建的,其构造方法如下:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) 
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
            null :
            AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;

其构造方法的几个参数我们不仅要理解,还要记清楚,这也是面试的高频问题。

  • corePoolSize,核心线程数

  • maximumPoolSize,最大线程数

  • keepAliveTime,线程存活时间

  • unit,线程存活的单位

  • workQueue,阻塞队列,用来存储待处理的任务

  • threadFactory,线程工厂,用来创建线程池中的工作线程

  • handler,拒绝策略,当线程池满负荷工作时,无法处理后续进来的任务,便会采用拒绝策略。

这些参数的含义和执行过程中的问题接下来会不断提到,继续看几种线程池是如何使用这些参数的。

2.1 newFixedThreadPool

newFixedThreadPool的创建代码如下:

public static ExecutorService newFixedThreadPool(int nThreads) 
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());

这里其实就是让最大线程数和核心线程数都是nThreads,从而创建固定数量的线程。

2.2 newSingleThreadExecutor

newSingleThreadExecutor方法的构造方法如下:

public static ExecutorService newSingleThreadExecutor() 
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));

这里其实就是创建了核心线程和最大线程数都是1的线程池,这样做的好处是可以保证任务的执行顺序。

2.3 newCachedThreadPool

newCachedThreadPool方法的构造方法是:

public static ExecutorService newCachedThreadPool() 
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());

newCachedThreadPool()方法提供一个可以缓存的线程池,从ThreadPoolExecutor类构造参数可以看出,核心线程数为0,而最大线程数是Integer.MAX_VALUE。并且使用了SynchronousQueue,这是一个没有存储容器的阻塞队列,一个生产者对其进行的插入操作再消费者之前就会被阻塞,反过来也是如此,所以当提交一个任务到线程池时,线程池会分配一个线程来处理这个任务。

当任务比较多时,newCachedThreadPool()方法会创建多个线程来处理,当任务量开始下降时,并不会立马回收这些新创建的线程,而是会缓存起来,60秒之后会回收处于空闲状态的线程,因此这种方式适合处理突发流量。

2.4 newScheduledThreadPool

这种方式的定义也比较特殊:

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) 
    return new ScheduledThreadPoolExecutor(corePoolSize);

public class ScheduledThreadPoolExecutor
        extends ThreadPoolExecutor
        implements ScheduledExecutorService 
     public ScheduledThreadPoolExecutor(int corePoolSize) 
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    

newScheduledThreadPool方法创建了一个定时或者周期性任务,其中corePoolSize表示核心线程数,最大线程数为Integer.MAX_VALUE。它比较适合用来实现定时任务,比如心跳检测、定时轮训等。

我们看一个使用的例子:

 public static void main(String[] args) 
    ScheduledExecutorService scheduledExecutorService= Executors.newScheduledThreadPool(3);
      scheduledExecutorService.schedule(()->
        System.out.println("延迟3s执行的任务");
    ,3, TimeUnit.SECONDS); 
    scheduledExecutorService.scheduleAtFixedRate(()->
        System.out.println("每隔2s执行一次");
    ,1,2,TimeUnit.SECONDS);
 

这个例子中构建了一个延迟3秒执行的任务,相比普通线程池来说,多了一个延期执行的功能。

这里用到了schedule()方法,其定义如下,其中command表示要执行的任务,delay表示延迟执行任务的时间,unit表示延迟执行时间的单位。

public ScheduledFuture<?> schedule(Runnable command,
                                   long delay, TimeUnit unit);

除此之外,还有如下两个常用的封装方法:

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                              long initialDelay,
                                              long period,
                                              TimeUnit unit);

scheduleAtFixedRate表示在initialDelay之后,开始周期执行任务command,间隔周期为period,时间单位为unit,如果任务执行时长大于间隔时长,那么上一个任务结束后立刻执行下一个任务,相当于连续执行。

public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                 long initialDelay,
                                                 long delay,
                                                 TimeUnit unit);

表示在initialDelay之后,开始周期执行任务command,间隔周期是delay,时间单位为unit。不管任务执行时间长短,下一个任务必须再上一个任务执行完成之后,等待固定间隔才执行。

3 Executor框架介绍

前面介绍的线程池体系其实都属于Java5之后引入的Executor框架。所谓Executor框架,是并发编程中引入的一些线程启动、调度、管理的API,通过这个框架可以很好地分离线程的工作任务和线程的执行过程 ,以及简化线程的基本操作。

从图中可以看到,ThreadPoolExecutor线程池实现了Executor和ExecutorService接口,下面简单说一下各个接口和类的含义。

Executor接口定义了一个任务执行器,提供了一个抽象方法executor()来接收一个Runnable实现类,很显然它是把线程任务的定义和线程任务的执行分开的核心,也就是由传统的new Thread(new Task()).start的方式变成了executor.execute(new Task())。

public interface Executor 
    void execute(Runnable command);

ExecutorService接口就是扩展了Executor接口的定义,增加了线程池的生命周期管理,提供管理带有返回值的线程调度方式(这里是Future/Callable方式)。有shutdown()关闭线程池的方法。

我们再看一个例子:

public static void main(String[] args) throws InterruptedException 
    ExecutorService executorService= Executors.newSingleThreadExecutor();
    executorService.shutdown();
    //如果等待60s之后,线程池还没有关闭,则强制中断。
    if(!executorService.awaitTermination(60, TimeUnit.SECONDS))
        executorService.shutdownNow();
    

Executor框架的核心思想在于将线程中任务的定义和任务的执行类型进行分离,使得线程的管理更加方便,线程的复用性也更高 ,了解了Executor框架之后,我们会对线程池的理解更加深入,体系也更加清晰。

4 线程池的设计

如果让我们自己构建一个线程池,该如何设计呢?我们知道线程池的核心需求是实现线程的复用。对于线程来说,本身的调度和执行并不由我们控制,而且线程是Thread中的run()方法执行结束后自动销毁完成回收的,所以我们首先想到让run()一直运行不就行了?如下这样:

public class ThreadExample extends Thread 
    @Override
    public void run() 
        while(true)
            //线程执行的任务
        
    

但是这样做,线程会一直运行,而且会占用大量的CPU资源,因此不能这样。

根据我们在前面学习的阻塞原理,如果没工作要干的时候让线程阻塞一下,有任务时就开干不就行了?这其实就是一个生产/消费者模型。

具体来说,所有通过execute(task)方法提交的任务到线程池的线程都是生产者,线程池收到这个任务之后,它应该会分配线程来执行,我们可以假设代码实现如下:

public class WorkThread extends Thread
    @Override
    public void run() 
       while (true)
           Task task=blokingQueued.take();
       
    

blokingQueued.take()的作用是从阻塞队列中获取任务,当队列中没有元素时,当前WorkThread会被阻塞在take()方法中,直到生产者往阻塞队列中添加一个具体的任务才会被唤醒。我们前面介绍ThreadPoolExecutor的构造方法时提到过有个参数就是阻塞队列,这就是线程池要使用阻塞队列的原因。

阻塞队列能够完成创建好的线程有任务时执行,无任务时阻塞等待,但是这里仍然不完备的,如果阻塞队列满了该怎么办呢?例如突然收到大量的任务,此时该如何处理呢?

此时最直接的方法是降低生产者的数量或者提高消费者的数量,但是如果两者都到极限了,需要线程池本身来处理,该怎么办呢?此时可以增加线程池的数量,如果增加到线程池也到顶了,此时应该采取拒绝策略让外部感知到自己已无法处理了,让外部采取其他措施。这类似产品经理给你安排太多的活,实在干不完,那你就应该给领导上报,如果全压在自己手里,最后全是你的责任。

还有一个问题,如果线程池里线程数很高,但是后面开始逐步下降的,我们为了节省资源 ,应该将过多的线程慢慢回收掉。这个该如何实现呢?这个条件不难找,就是当工作线程从阻塞队列中获取任务时,如果等待一段时间之后没有拿到任务,说明线程池处于空闲状态,这时候线程不必再等了,直接返回null就行了。而这恰恰是ThreadPoolExecutor里的超时时间和超时单位keepAliveTime和unit,对应的就是空闲线程的存活时间和单位。

经过这些分析之后,我们也就知道了线程池最终的样子:

 

线程池任务执行流程 1.首先检测线程池运行状态,如果不是RUNNING,则直接拒绝,线程池要保证在RUNNING的状态下执行任务。 2.如果workerCount < corePoolSize,则创建并启动一个线程来执行新提交的任务。 3.如果workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中。 4.如果workerCount >= corePoolSize && workerCount < maximumPoolSize,且线程池内的阻塞队列已满,则创建并启动一个线程来执行新提交的任务。 如果workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满, 则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常。

上面的过程非常重要,面试高频问题,务必掌握好!!

5. 拒绝策略

如果线程池中的阻塞队列已经满了,线程池中的线程数量也达到最大了,空闲线程也没有了,如果此时继续有任务提交到线程池,该怎么办呢?此时我们应该采取某种拒绝策略来拒绝任务的执行 ,也就是将问题上报。比如说你工作完全被排满了,产品经理还在给你加任务 ,你必须采取某种策略来保护自己,例如,直接拒绝,置之不理、转交给自己的小弟、找领导等等,而这也对应了线程池的4中拒绝策略。

AbortPolicy

这是ThreadPoolExecutor默认使用的拒绝策略,就是直接抛出一个异常RejectedExecutionException。在很多关键业务上,如果业务不能承载更大的并发量,就应该及时通过异常来发现问题并做出处理。对应的场景就是活太多,你直接找领导。

CallerRunsPolicy

只要线程池没有执行关闭方法,就由提交任务的线程通过执行run()方法来直接通过普通方法完成。这种策略相当于要保证所有任务都必须完成。对应的场景就是产品给的活不算重,你不排期,直接让你的小弟给搞一下就行了。

DiscardPolicy

直接把任务丢弃。如果采取这种策略,系统无法发现具体的问题。对应的场景就是你将产品的需求接了但是不做。以后出了问题再说,一般要完蛋的公司会大量出现这种情况。

DiscardOldestPolicy

顾名思义,就是将等待最久的任务丢掉。对应的场景就是已经提了两年的需求,还做它干嘛。编一个笑话:

产品经理:我的需求做好了吗?
程序员:还没有开始呢,最近忙着的。
产品经理:我都提了三个月了,什么时候才能完成。
程序员:都等了三个月了,那就再等等呗。
产品经理:我下周就离职了,总不能让我无法交接吧。
程序员:既然人都要走了,那还做了干啥。

事实上 ,上述几种策略没有好坏之分,需要根据情况决定采用哪一种。

以上是关于28.线程池的主要内容,如果未能解决你的问题,请参考以下文章

记5.28大促压测的性能优化—线程池相关问题

记5.28大促压测的性能优化—线程池相关问题

记5.28大促压测的性能优化—线程池相关问题

java基础--28.线程池简介以及实际应用

Python3 从零单排28_线程队列&进程池&线程池

Tomcat数据源和连接池配置怎么配~~ 小弟快疯了~~~