线程池基本使用详解

Posted Java小朋友

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了线程池基本使用详解相关的知识,希望对你有一定的参考价值。

对于大数据的发展,越来越多的公司在面试的时候或多或少会问起一些多线程的问题

下面具体分享下自身所学的线程池相关的小知识


  • 我的创建线程池创建工具类

  • 参数解释

  • 四种拒绝策略

  • 任务提交的三种方式

  • 总结


线程池的创建

 线程池的创建方式大体分为两种:

   1、通过Executors工厂方法创建

   2、自定义创建


1.

 通过Executors创建的四种方式

     newCachedThreadPool:创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。

     newFixedThreadPool: 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。

     newScheduledThreadPool: 创建一个定长线程池,支持定时及周期性任务执行。  

     newSingleThreadExecutor: 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。


2.

自定义创建


    我们今天要讲的是自定义创建,真实项目中使用的线程池大部分都是自定义的,而且上面的4种其实也都是基于自定义创建出来的,有兴趣的小伙伴看完下面讲的自定义创建再去看看Executors里面的这四种方法源码就懂了。


本人项目中工具类构件方式:

   /**     * 最终创建实现     *     * @param corePoolSize    核心线程     * @param keepAliveTime   存活时间     * @param maximumPoolSize 最大线程     * @param queueCapacity   队列长度     * @param poolName        线程名称     */ public static ThreadPoolExecutor myNewThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, int queueCapacity, String poolName) {        ThreadFactory threadFactory = null;        if (keepAliveTime == 0L) {            keepAliveTime = 30L;        }        if (StringUtils.isNotBlank(poolName)) {            //这里自定义线程工厂,可用于日志追溯                        threadFactory = new MyThreadFactory(poolName);        } else {            threadFactory = Executors.defaultThreadFactory();        }
       //创建线程池(核心代码)        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize,                keepAliveTime, TimeUnit.SECONDS,                new LinkedBlockingQueue<Runnable>(queueCapacity), threadFactory, new ThreadPoolExecutor.CallerRunsPolicy());
       //设置核心线程可以销毁        threadPoolExecutor.allowCoreThreadTimeOut(true);
       return threadPoolExecutor;    }


     上面的代码主要核心在于创建线程池这一行,其他只是自己使用时配置的。下面讲讲new ThreadPoolExecutor()各个参数的含义; 


参数解释(重点)

corePoolSize:核心线程数量,当有任务进来时,首先接收任务的是核心线程;


maximumPoolSize:最大线程数量,当核心线程处理不过来的时候,这时候任务会放在缓存队列里面,当队列满了,此时若当前线程数比最大线程数小,那么就会创建线程去接任务,直到线程数达到最大线程数,从而拒绝任务。所以这里如果缓存队列无界的话,那么最大线程数不会被触发;


keepAliveTime:线程存活时间,线程不做任务后多少时间会被销毁,结合后面的时间单位使用;这里需要注意的是,默认只会销毁非核心线程的部分,默认情况下核心线程一旦创建,是不会销毁的,但是我们可以通过threadPoolExecutor.allowCoreThreadTimeOut(true)使核心线程可以被销毁,这样可以把服务器的线程资源让给  其他线程池;


new LinkedBlockingQueue<Runnable>(queueCapacity): 缓存队列,当前使用的这个是链式阻塞队列,指定长度为queueCapacity。下面复制下百度对阻塞队列的介绍:

主要有3种类型的BlockingQueue:


无界队列

      队列大小无限制,常用的为无界的LinkedBlockingQueue,使用该队列做为阻塞队列时要尤其当心,当任务耗时较长时可能会导致大量新任务在队列中堆积最终导致OOM。                  Executors.newFixedThreadPool 采用就是 LinkedBlockingQueue,而楼主踩到的就是这个坑,当QPS很高,发送数据很大,大量的任务被添加到这个无界LinkedBlockingQueue 中,导致cpu和内存飙升服务器挂掉。


有界队列

     常用的有两类,一类是遵循FIFO原则的队列如ArrayBlockingQueue,另一类是优先级队列如PriorityBlockingQueue。PriorityBlockingQueue中的优先级由任务的Comparator决定。 
     使用有界队列时队列大小需和线程池大小互相配合,线程池较小有界队列较大时可减少内存消耗,降低cpu使用率和上下文切换,但是可能会限制系统吞吐量。

      在我们的修复方案中,选择的就是这个类型的队列,虽然会有部分任务被丢失,但是我们线上是排序日志搜集任务,所以对部分对丢失是可以容忍的。


同步移交队列

      如果不希望任务在队列中等待而是希望将任务直接移交给工作线程,可使用SynchronousQueue作为等待队列。SynchronousQueue不是一个真正的队列,而是一种线程之间移交的机制。要将一个元素放入SynchronousQueue中,必须有另一个线程正在等待接收这个元素。只有在使用无界线程池或者有饱和策略时才建议使用该队列。


threadFactory: 生成线程池中工作线程的线程工厂,这里我只知道的是打印日志的时候比较方便,可以显示哪个线程池打印的;

RejectedExecutionHandler:拒绝策略,指定队列和最大线程都满了后的任务拒绝行为,下面结合百度具体介绍下;我使用的是new ThreadPoolExecutor.CallerRunsPolicy();


四种拒绝策略(也是重点)

     JDK主要提供了4种饱和策略供选择。4种策略都做为静态内部类在ThreadPoolExcutor中进行实现。

01

AbortPolicy中止策略(默认)

    线程池默认使用该策略,使用该策略时在饱和时会抛出RejectedExecutionException(继承自RuntimeException),调用者可捕获该异常自行处理。


public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); }


02

DiscardPolicy抛弃策略

不做任何处理直接抛弃任务,这种情况我们无法感知丢失了多少数据,所以这种适用于不关心数据丢失的场景。

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {}


03

DiscardOldestPolicy抛弃旧任务策略

    先将阻塞队列中的头元素出队抛弃,再尝试提交任务。如果此时阻塞队列使用PriorityBlockingQueue优先级队列,将会导致优先级最高的任务被抛弃,因此不建议将该种策略配合优先级队列使用。

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); }}

04

CallerRunsPolicy调用者运行

   既不抛弃任务也不抛出异常,直接运行任务的run方法,换言之将任务回退给调用者来直接运行。使用该策略时线程池饱和后将由调用线程池的主线程自己来执行任务,因此在执行任务的这段时间里主线程无法再提交新任务,从而使线程池中工作线程有时间将正在处理的任务处理完成。对于数据不能丢失的场景,本人更喜欢这种。

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {            if (!e.isShutdown()) {                r.run();            }}


任务提交的三种方式

   相信很多人知道线程池有两种提交方式

execute和submit,但是为什么我这里要说三种呢,这里我提供一种平常自己喜欢使用的方法:直接把任务放到队列(等会贴使用示例)。


1.

第一种和第二种方法

execute与submit

execute与submit的区别:

  • execute只能提交Runnable类型的任务,无返回值。submit既可以提交Runnable类型的任务,也可以提交Callable类型的任务,会有一个类型为Future的返回值,但当任务类型为Runnable时,返回值为null。

  • execute在执行任务时,如果遇到异常会直接抛出,而submit不会直接抛出,只有在使用Future的get方法获取返回值时,才会抛出异常。

使用submit注意点:

   1、通过future的get方法在未获得返回值之前会一直阻塞,我们可以使用future的isDone方法判断任务是否执行完成,然后再决定是否get;

   2、get方法可以设置超时时间,可用于解决等待所有结果返回时,因一个线程导致整个服务阻塞的问题。


2.

第三种方法

直接提交到队列,这里使用的特性是:若核心线程池存在,那么核心线程会去去队列里面的任务来执行。


优点:务对了满了的情况下,可以根据不用条件设置不同的拒绝策使用offer方法可以实现队列有资源时提交任务返回true,资源不足时返回false; 使用put方法可以实现资源满时,主线程阻塞等待队列空闲再把任务放进队列


缺点:需要提前激活核心线程,且最大线程不生效,拒绝策略也不会用到,因为这种方式的提交队列本身就会阻塞主线程的任务提交,不会任务过多。


使用示例:

 public static void main(String[] args) { //阻塞队列 BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(5); ThreadPoolExecutor pool = new ThreadPoolExecutor(2, 4, 10, TimeUnit.SECONDS, queue, Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy()); //提前激活核心线程数 pool.prestartAllCoreThreads(); for (int i = 1; i <= 20; i++) { try { //如果是偶数,队列满了则丢弃 int finalI = i; if (finalI % 2 == 0) { boolean f = queue.offer(() -> { try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("线程" + Thread.currentThread().getName() + ":" + finalI); });
if (!f) { System.out.println(finalI + "被丢弃了"); }
} else { //奇数则等待 queue.put(() -> { try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("线程" + Thread.currentThread().getName() + ":" + finalI); }); } } catch (Exception e) { e.printStackTrace(); } }
}


输出结果:

8被丢弃了线程pool-1-thread-2:2线程pool-1-thread-1:1线程pool-1-thread-1:4线程pool-1-thread-2:312被丢弃了14被丢弃了线程pool-1-thread-1:516被丢弃了线程pool-1-thread-2:618被丢弃了线程pool-1-thread-2:9线程pool-1-thread-1:720被丢弃了线程pool-1-thread-2:10线程pool-1-thread-1:11线程pool-1-thread-2:13线程pool-1-thread-1:15线程pool-1-thread-2:17线程pool-1-thread-1:19


总结


【强制】线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor的方式,这样 的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。 


说明:

 Executors 返回的线程池对象的弊端如下:

1)FixedThreadPool 和SingleThreadPool 

 允许的请求队列长度为 Integer.MAX_VALUE ,可能会堆积大量的请求,从而导致 OOM 。


2) CachedThreadPool 和 ScheduledThreadPool : 

允许的创建线程数量为 Integer.MAX_VALUE ,可能会创建大量的线程,从而导致 OOM 。


使用建议:

 1、根据项目的运行内存指定合适的缓存队列大小,尽量少用无界队列;


 2、对于使用不频繁的线程池设置核心线程可以销毁,因为服务器的总线程是固定的;


 3、对于不允许丢失数据的场景,采用CallerRunsPolicy主线程运行的拒绝策略


1

END

1


下篇预告:Jvm与Tomcat调优

以上是关于线程池基本使用详解的主要内容,如果未能解决你的问题,请参考以下文章

Java 线程池详解

Java线程池详解

线程池基本使用详解

newCacheThreadPool()newFixedThreadPool()newScheduledThreadPool()newSingleThreadExecutor()自定义线程池(代码片段

线程池详解(ThreadPoolExecutor)

Java线程详解