线程池的探索(上)
Posted 卢松说
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了线程池的探索(上)相关的知识,希望对你有一定的参考价值。
第二章 JDK源码剖析-并发篇
第 32 节
ThreadPool(上)
(Hello,各位好久不见了,这周会恢复更新了,预计JDK系列会进行基本的完结,JDK第三章实践篇会在后面分析中间件ZK、Kafka、HDFS等原理和源码时作为番外来写。)
在第二章的并发篇中经过了并发基础知识、并发组件、并发集合的学习后,最后一点知识就是线程池了。这一节先带大家简单回顾下线程的基本知识,下一节具体带大家研究下它的源码。
在这一节首先会讲一下线程池的预备知识,比如Executor、ExecutorService、Callable、Future、SynchronousQueue、DelayQueue等这些是什么,主要作用是什么。接着会介绍下线程池定义的7大参数。最后会介绍下线程池的各种类型。
让我们先来看下第一点,线程池的一些预备知识。
线程池预备知识
线程池用大白话理解就是存放线程的池子,你可以理解是线程的缓冲、缓存都可以。除了这一点,线程池还有很重要一点就是线程怎么执行,这个逻辑也是可以通过线程池做的。
简单的讲,线程池就是存放线程或者按照一定规则执行线程。目的是为了减少频繁的线程资源创建和销毁,从而增加线程并行处理的性能。
线程存放涉及的概念和预备知识
先来看下线程存放涉及的概念和预备知识。
存放线程该怎么存?肯定是用一些集合了,比如用一个数组、链表、队列等等都可以。JDK默认提供的一些线程池都是一些Queue和Set集合来存放线程任务的。如下图所示:
Queue有很多种,具体使用哪一种,不同的线程池可能不一样。
有的使用LinkedBlockingQueue,有的使用PriorityBlockingQueue,有的使用DelayQueue,有的使用SynchronousQueue、TransferQueue等。
除了LinkedBlockingQueue,后面的几种队列,之前没有讲过具体的源码原理,这里给大家简单介绍下:
PriorityBlockingQueue是优先级队列,底层排好的一颗有序二叉树,Blocking就说明是具备阻塞特性的队列,出队顺序和优先级有关系。
DelayQueue 是延迟队列,按照等待时间进行排序,可自定义等待时间,按时间进行任务调度,每个队列元素需要具备时间元素,内部其实也是PriorityQueue,优先级队列。
SynchronousQueue 点对点的同步阻塞队列,每次入队一个元素后,只有等待消费者take()出队后,才会继续put入队。而且是单线程对单线程的,一般可以用于两个线程交换数据。
TransferQueue 点对多的队列,也是阻塞性的队列,需要等待某一个消费者消费后才可以继续入队元素。
具体使用你可以自己网上百度或google下,就能了解具体的用法了。这里大家只要知道,上面这些队列就是可能会用来存放队列用的就可以了。
线程执行涉及的预备知识
接着再来看下线程执行涉及的预备知识:
Executor、ExecutorService、ThreadPoolExecutor、Executors 、Callable、Future这一大堆类是什么关系?干什么用的?
这里你一定要理解到本质,都是为了规定线程的执行设计的类,其实上面的类或接口都是为线程执行设计了一些方法而已。这个才是你要悟到的东西,java中很多接口都是抽象了物质的行为而已。
知道了这个思想,就很简单了,java中通过接口,可以为执行线程定义一些行为,也就是方法,比如如下的两个类:
Executor、ExecutorService主要定义了execute、submit、shutdown等方法。
这里可以给大家一个Demo体会一下:
public class HelloExecutor implements Executor {
public static void main(String[] args) {
HelloExecutor helloExecutor = new HelloExecutor();
helloExecutor.execute(()-> System.out.println("Hello Executor!"));
}
@Override
public void execute(Runnable command) {
command.run();
}
}
而JDK中,为线程执行的默认实现这些接口的行为,主要通过如下2个类做到的:
AbstractExecutorService、ThreadPoolExecutor 实现了上述接口的方法,并且定义了存储线程的结构为队列和HashSet。(后面分析ThreadPoolExecutor会看到)。
上面四者的类图关系如下:
(这里补充一点,Executors 是一个工具类,只是封装了ThreadPoolExecutor的几种创建方式而已。)
Callable、Future是做什么的?—获取线程结果用的!
除了上面的线程执行的行为定义和实现外。有的线程需要获取执行结果,所以线程执行体的定义,除了Runnable,其实还设计了Callable接口,表示了带返回值的线程任务定义。还设计了Future存储执行结果,是线程同步执行的接口定义,甚至还有很多高级的线程同步执行的接口定义,比如CompletableFuture等等。
线程池类型
之前提到,线程池定义很多接口行为,默认JDK提供了两类线程池的实现模型,当然也可以自己定义线程模型。每个模型下的线程池功能不同,比如有固定容量的线程池,有定时调度的线程池等等。我简单总结如下:
1、基于ThreadpoolExecutor模型的线程池:fixed 、cached 、single、 scheduled的pool。
2、基于Forkjoin模型的线程池:workstealingpool、ForkjoinPool。(streamAPI中的parllStream本质也是ForkJoin模型)
3、自定义模型的线程池:如Netty中的SingleThreadEventExecutor、或者你自己定义的MyExecutor等。
相信上面的集中线程池大家应该都不陌生了,这里就不举具体的使用例子了。这里重点提一下第一类中的不同线程池的使用场景:
1)SingleThreadPool创建一个线程的线程池,适合管理线程,线程可以存着在队列LinkedBlockingQueue中等待,适合创建某一个守护线程或者功能单一的线程任务。
2)CacheThreadPool动态创建线程,线程波动不稳定时使用,任务队列为SynchronousQueue,实际不能缓冲线程等待。线程任务比较短最合适,不会长时间占用大量的线程资源。
3)FixThreadPool固定线程数的线程池,线程波动稳定时使用,或者线程任务执行较长
4)SchdualThreadPool 定时任务线程池,底层是DelayQueue,适合周期性的执行线程,比如心跳线程等。
Hello线程池—线程池常见的七大参数
最后我们来线程池创建的七个参数:
1)corePoolSize 表示线程核心个数(通过变量和HashSet集合实现控制)
2)maximumPoolSize表示线程池最大个数(通过变量和HashSet集合实现控制)
3)keepAliveTime 线程空闲时多长时间被杀死
4)unit keepAliveTime 的时间单位
5)workQueue 线程等待队列(通过某种Queue实现控制)
6)ThreadFactory定义线程的如何创建
7)RejectedExecutionHandler 当队列和PoolSize无法放入线程,会进入拒绝策略的判断。
你可以使用Executors工具类的可以创建,但是通常不建议这么做,因为这样不好控制队列的大小和拒绝策略等设置,可能使得队列过大造成线程挤压,CPU 100%等问题。所以实际生产环境都是自定义线程池的创建,一般会如下所示创建线程池:
public class HelloThreadPool {
public static void main(String[] args) {
ExecutorService pool = new ThreadPoolExecutor(
5, //corePoolSize
200, //maximumPoolSize
0L, //keepAliveTime 线程空闲时多长时间被杀死D
TimeUnit.MILLISECONDS, //unit
new LinkedBlockingQueue<Runnable>(1024), //workQueue等待队列
new ThreadFactory() {
@Override
public Thread newThread(Runnable runnable) {
Thread thread = new Thread(runnable);
thread.setName("Hello-ThreadPool");
thread.setDaemon(true);
return thread;
}
}, //threadFactory 自定义线程工厂,如何创建工厂
new ThreadPoolExecutor.AbortPolicy()); //线程池策略
// new MyHandler()); //自定义线程池策略
pool.execute(()-> System.out.println(Thread.currentThread().getName()));
pool.shutdown();//gracefully shutdown
}
static class MyHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
//打印日志
System.out.println(r + "is rejected!");
if(executor.getQueue().size() < 10000) {
//do something,eg: save it to db or send msg to mq or retry 3 times
//做一点事情,比如保存到数据库,发送消息到mq或者重新执行3次
}
}
}
}
这一节我们初步的介绍了线程池的基本知识,下一节会具体研究下JDK提供给的ThreadPoolExecutor的源码原理。让你可以更好的把控线程池的使用。
以上是关于线程池的探索(上)的主要内容,如果未能解决你的问题,请参考以下文章
android线程与线程池-----线程池《android开发艺术与探索》