线程池的探索(上)

Posted 卢松说

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了线程池的探索(上)相关的知识,希望对你有一定的参考价值。

第二章  JDK源码剖析-并发篇

第 32 节


ThreadPool(上)


(Hello,各位好久不见了,这周会恢复更新了,预计JDK系列会进行基本的完结,JDK第三章实践篇会在后面分析中间件ZK、Kafka、HDFS等原理和源码时作为番外来写。)


在第二章的并发篇中经过了并发基础知识、并发组件、并发集合的学习后,最后一点知识就是线程池了。这一节先带大家简单回顾下线程的基本知识,下一节具体带大家研究下它的源码。

 

在这一节首先会讲一下线程池的预备知识,比如Executor、ExecutorService、Callable、Future、SynchronousQueue、DelayQueue等这些是什么,主要作用是什么。接着会介绍下线程池定义的7大参数。最后会介绍下线程池的各种类型。

 

让我们先来看下第一点,线程池的一些预备知识。

 


线程池预备知识


线程池用大白话理解就是存放线程的池子,你可以理解是线程的缓冲、缓存都可以。除了这一点,线程池还有很重要一点就是线程怎么执行,这个逻辑也是可以通过线程池做的。

简单的讲,线程池就是存放线程或者按照一定规则执行线程。目的是为了减少频繁的线程资源创建和销毁,从而增加线程并行处理的性能。

 

  1. 线程存放涉及的概念和预备知识

 

先来看下线程存放涉及的概念和预备知识。

 

存放线程该怎么存?肯定是用一些集合了,比如用一个数组、链表、队列等等都可以。JDK默认提供的一些线程池都是一些Queue和Set集合来存放线程任务的。如下图所示:

    


Queue有很多种,具体使用哪一种,不同的线程池可能不一样。

 

有的使用LinkedBlockingQueue,有的使用PriorityBlockingQueue,有的使用DelayQueue,有的使用SynchronousQueue、TransferQueue等。

 

除了LinkedBlockingQueue,后面的几种队列,之前没有讲过具体的源码原理,这里给大家简单介绍下:

 

  • PriorityBlockingQueue是优先级队列,底层排好的一颗有序二叉树,Blocking就说明是具备阻塞特性的队列,出队顺序和优先级有关系。

  • DelayQueue 是延迟队列,按照等待时间进行排序,可自定义等待时间,按时间进行任务调度,每个队列元素需要具备时间元素,内部其实也是PriorityQueue,优先级队列。

  • SynchronousQueue 点对点的同步阻塞队列,每次入队一个元素后,只有等待消费者take()出队后,才会继续put入队。而且是单线程对单线程的,一般可以用于两个线程交换数据。

  • TransferQueue 点对多的队列,也是阻塞性的队列,需要等待某一个消费者消费后才可以继续入队元素。

 

具体使用你可以自己网上百度或google下,就能了解具体的用法了。这里大家只要知道,上面这些队列就是可能会用来存放队列用的就可以了。

 

  1. 线程执行涉及的预备知识

 

接着再来看下线程执行涉及的预备知识:

Executor、ExecutorService、ThreadPoolExecutor、Executors 、Callable、Future这一大堆类是什么关系?干什么用的?

 

这里你一定要理解到本质,都是为了规定线程的执行设计的类,其实上面的类或接口都是为线程执行设计了一些方法而已。这个才是你要悟到的东西,java中很多接口都是抽象了物质的行为而已。

 

知道了这个思想,就很简单了,java中通过接口,可以为执行线程定义一些行为,也就是方法,比如如下的两个类:

Executor、ExecutorService主要定义了execute、submit、shutdown等方法。

 

这里可以给大家一个Demo体会一下:


public class HelloExecutor implements Executor {

    public static void main(String[] args) {

        HelloExecutor helloExecutor = new HelloExecutor();

        helloExecutor.execute(()-> System.out.println("Hello Executor"));

    }

    @Override

    public void execute(Runnable command) {

        command.run();

    }

}

 

而JDK中,为线程执行的默认实现这些接口的行为,主要通过如下2个类做到的:

AbstractExecutorService、ThreadPoolExecutor 实现了上述接口的方法,并且定义了存储线程的结构为队列和HashSet。(后面分析ThreadPoolExecutor会看到)。

 

上面四者的类图关系如下:

 


 

(这里补充一点,Executors 是一个工具类,只是封装了ThreadPoolExecutor的几种创建方式而已。)

 


  • Callable、Future是做什么的?—获取线程结果用的!

 

除了上面的线程执行的行为定义和实现外。有的线程需要获取执行结果,所以线程执行体的定义,除了Runnable,其实还设计了Callable接口,表示了带返回值的线程任务定义。还设计了Future存储执行结果,是线程同步执行的接口定义,甚至还有很多高级的线程同步执行的接口定义,比如CompletableFuture等等。

 


 线程池类型


之前提到,线程池定义很多接口行为,默认JDK提供了两类线程池的实现模型,当然也可以自己定义线程模型。每个模型下的线程池功能不同,比如有固定容量的线程池,有定时调度的线程池等等。我简单总结如下:

 

1、基于ThreadpoolExecutor模型的线程池:fixed 、cached 、single、 scheduled的pool。

2、基于Forkjoin模型的线程池:workstealingpool、ForkjoinPool。(streamAPI中的parllStream本质也是ForkJoin模型)

3、自定义模型的线程池:如Netty中的SingleThreadEventExecutor、或者你自己定义的MyExecutor等。

 

相信上面的集中线程池大家应该都不陌生了,这里就不举具体的使用例子了。这里重点提一下第一类中的不同线程池的使用场景:

1)SingleThreadPool创建一个线程的线程池,适合管理线程,线程可以存着在队列LinkedBlockingQueue中等待,适合创建某一个守护线程或者功能单一的线程任务。

2)CacheThreadPool动态创建线程,线程波动不稳定时使用,任务队列为SynchronousQueue,实际不能缓冲线程等待。线程任务比较短最合适,不会长时间占用大量的线程资源。

3)FixThreadPool固定线程数的线程池,线程波动稳定时使用,或者线程任务执行较长

4)SchdualThreadPool    定时任务线程池,底层是DelayQueue,适合周期性的执行线程,比如心跳线程等。

 


Hello线程池—线程池常见的七大参数


最后我们来线程池创建的七个参数:

1)corePoolSize 表示线程核心个数(通过变量和HashSet集合实现控制)

2)maximumPoolSize表示线程池最大个数(通过变量和HashSet集合实现控制)

3)keepAliveTime 线程空闲时多长时间被杀死

4)unit keepAliveTime 时间单位

5workQueue 线程等待队列(通过某种Queue实现控制)

6ThreadFactory定义线程的如何创建

7RejectedExecutionHandler 当队列和PoolSize无法放入线程,会进入拒绝策略的判断。

 

你可以使用Executors工具类的可以创建,但是通常不建议这么做,因为这样不好控制队列的大小和拒绝策略等设置,可能使得队列过大造成线程挤压,CPU 100%等问题。所以实际生产环境都是自定义线程池的创建,一般会如下所示创建线程池:

 

public class HelloThreadPool {

    public static void main(String[] args) {

        ExecutorService pool = new ThreadPoolExecutor(

                5,  //corePoolSize

                200//maximumPoolSize

                0L,  //keepAliveTime 线程空闲时多长时间被杀死D

                TimeUnit.MILLISECONDS//unit

                new LinkedBlockingQueue<Runnable>(1024), //workQueue等待队列

                new ThreadFactory() {

                    @Override

                    public Thread newThread(Runnable runnable) {

                        Thread thread = new Thread(runnable);

                        thread.setName("Hello-ThreadPool");

                        thread.setDaemon(true);

                        return thread;

                    }

                },  //threadFactory 自定义线程工厂,如何创建工厂

                new ThreadPoolExecutor.AbortPolicy()); //线程池策略

//              new MyHandler()); //自定义线程池策略

 

        pool.execute(()-> System.out.println(Thread.currentThread().getName()));

 

        pool.shutdown();//gracefully shutdown   

    }

 

    static class MyHandler implements RejectedExecutionHandler {

        @Override

        public void rejectedExecution(Runnable rThreadPoolExecutor executor) {

            //打印日志

            System.out.println(r + "is rejected!");

            if(executor.getQueue().size() < 10000) {

                //do something,eg: save it to db or send msg to mq or retry 3 times

               //做一点事情,比如保存到数据库,发送消息到mq或者重新执行3

            }

        }

    }

}

 

 

 

这一节我们初步的介绍了线程池的基本知识,下一节会具体研究下JDK提供给的ThreadPoolExecutor的源码原理。让你可以更好的把控线程池的使用。


以上是关于线程池的探索(上)的主要内容,如果未能解决你的问题,请参考以下文章

线程池探索之基础篇

线程池的探索

线程池的探索(下)

android线程与线程池-----线程池《android开发艺术与探索》

《Android 开发艺术探索》 第11章 --- android 线程和线程池

《Android开发艺术探索》第11章 Android的线程和线程池