Java多线程-12(线程管理中的线程池)
Posted 小北呱
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java多线程-12(线程管理中的线程池)相关的知识,希望对你有一定的参考价值。
线程池
个人博客:www.xiaobeigua.icu
1.2 线程池
线程池就是有效使用线程的一种常用方式。线程池内部可以预先创建一定数量的工作线程,客户端代码直接将任务作为一个对象提交给线程池,线程池将这些任务缓存在工作队列中, 线程池中的工作线程不断地从队列中取出任务并执行。
1.2.1什么是线程池
可以以 new Thread( () -> { 线程执行的任务 }).start();(λ 表达式) 这种形式开启一个线程. 当 run()方法运行结束,线程对象会被 GC 释放。
在真实的生产环境中,可能需要很多线程来支撑整个应用,当线程数量非常多时 ,反而会耗尽 CPU 资源。 如果不对线程进行控制与管理,反而会影响程序的性能。
线程主要开销包括:
创建与启动线程的开销;
线程销毁开销;
线程调度的开销;
线程数量受限 CPU 处理器数量。
线程池就是有效使用线程的一种常用方式。线程池内部可以预先创建一定数量的工作线程,客户端代码直接将任务作为一个对象提交给线程池,线程池将这些任务缓存在工作队列中, 线程池中的工作线程不断地从队列中取出任务并执行。
1.2.2 JDK 对线程池的支持
JDK 提供了一套 Executor 框架,可以帮助开发人员有效的使用线程池
在java中,线程池有很多的实现类,一般情况下,我们可可以使用上图中的ThreadPoolExecutor实现类 或者Executors工具类来实现创建线程池 。更多线程池类后面会再分析
例子:使用 Executors工具类来创建和使用线程池
/**
* 线程池的基本使用
* 作者:小北呱
*/
public class TestOne {
public static void main(String[] args) {
//使用Executors创建 线程数为5的线程池
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);
//创建15个任务分配给线程池
for (int i=0;i<15;i++){
//执行任务
fixedThreadPool.execute(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getId()+"线程执行了任务....");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
}
}
结果: 一个把15个任务交给了线程池,但是里面只有5个活动线程,所以一次性只能执行五个任务,身下的任务就会进入任务等待队列中等待
1.2.3 核心线程池的底层实现
查看 Executors 工 具 类 中 newCachedThreadPool(), newSingleThreadExcecutor(), newFixedThreadPool()源码:
注意:这几个方法是用来创建不同功能的线程池的
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
}
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()));
}
发现 Excutor 工具类中返回线程池的方法底层都使用了 ThreadPoolExecutor 线程池,所以我们可以发现这些方法都是 ThreadPoolExecutor 线程池的封装
我们再来看看ThreadPoolExecutor 的构造方法:
ThreadPoolExecutor 的构造方法:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
各个参数含义:
corePoolSize, 指定线程池中核心线程的数量
maxinumPoolSize,指定线程池中最大线程数量
keepAliveTime,当线程池线程的数量超过 corePoolSize 时,多余的空闲线程的存活时长,即空闲线程在多长时长内销毁 unit, 是 keepAliveTime 时长单位
workQueue,任务队列,把任务提交到该任务队列中等待执行
threadFactory,线程工厂,用于创建线程 handler 拒绝策略,当任务太多来不及处理时,如何拒绝
handler 拒绝策略,当任务太多来不及处理时,如何拒绝
workQueue 说明:
workQueue 工作队列是指提交未执行的任务队列 , 它是 BlockingQueue 接口的对象,仅用于存储 Runnable 任务。根据队列功能分类,在ThreadPoolExecutor 构造方法中可以使用以下几种阻塞队列:
1) 直接提交队列,由 SynchronousQueue 对象提供,该队列没有容量,提交给线程池的任务不会被真实的保存,总是将新的任务提交给线程执行,如果没有空闲线程,则尝试创建新的线程,如果线程数量已经达到 maxinumPoolSize 规定的最大值则执行拒绝策略。
2) 有界任务队列 ,由 ArrayBlockingQueue 对象提供 , 在 创 建 ArrayBlockingQueue 对象时,可以指定一个容量。当有任务需要执行时,如果线程池中线程数小于 corePoolSize 核心线程数则创建新的线程;如果大于 corePoolSize 核心线程数则加入等待队列。如果队列已满则无法加入,在线程数小于 maxinumPoolSize 指定的最大线程数前提下会创建新的线程来执行 , 如果线程数大 于 maxinumPoolSize 最大线程数则执行拒绝策略。
3) 无界任务队列,由 LinkedBlockingQueue 对象实现,与有界队列相比,除非系统资源耗尽,否则无界队列不存在任务入队失败的情况。当有新的任务时,在系统线程数小于 corePoolSize 核心线程数则创建新的线程来执行任务,当线程池中线程数量大于corePoolSize 核心线程数则把任务加入阻塞队列。
4) 优先任务队列是通过 PriorityBlockingQueue 实现的,是带有任务优先级的队列 , 是 一 个特殊的无界队列。 不管是 ArrayBlockingQueue 队列还是 LinkedBlockingQueue 队列都是按照 先进先出算法处理任务的。在 PriorityBlockingQueue 队列中可以根 据任务优先级顺序先后执行。
1.2.4 拒绝策略
ThreadPoolExecutor 构造方法的最后一个参数指定了拒绝策略。当提交给线程池的任务量超过实际承载能力时,如何处理? 即线程池中的线程已经用完了,等待队列也满了,无法为新提交的任务服务,可以通过拒绝策略来处理这个问题。
JDK 提供了四种拒绝策略:
AbortPolicy 策略,会抛出异常
CallerRunsPolicy 策略,只要线程池没关闭,会在调用者线程中运行当前被丢弃的任务
DiscardOldestPolicy 将任务队列中最老的任务丢弃,尝试再次提交新任务
DiscardPolicy 直接丢弃这个无法处理的任务
Executors 工具类提供的静态方法返回的线程池默认的拒绝策略是 AbortPolicy 抛出异常,如果内置的拒绝策略无法满足实际需求,可以扩展 RejectedExecutionHandler 接口
例子:使用 ThreadPoolExecutor创建 固定大小为2的线程池 给他分配5个任务.
public class TestOne {
public static void main(String[] args) {
//使用 ThreadPoolExecutor创建 固定大小为2的线程池 使用了new SynchronousQueue<>()表示当线程池线程都在工作时,新的任务直接拒绝
ThreadPoolExecutor threadPool=new ThreadPoolExecutor(2, 2, 0, TimeUnit.SECONDS, new SynchronousQueue<>(), new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
//r就是请求任务,executor就是当前线程池
System.out.println(r+"线程池被拒绝了");
}
});
//创建5个任务分配给线程池
for (int i=0;i<5;i++){
//执行任务
threadPool.execute(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getId()+"线程执行了任务....");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
}
}
结果:
分析:
我们只在线程池中创建了最大数量为 2 的线程 ,使用了new SynchronousQueue<>()的直接提交工作队列,表示当线程吃的线程都在工作时,新的任务直接拒绝,所以当5个任务进入线程池时,只有两个可以被线程池中的线程执行,剩下的3个任务直接被拒绝了
1.2.5 ThreadFactory
线程池中的线程从哪儿来的? 答案就是 ThreadFactory。
ThreadFactory 是一个接口,只有一个用来创建线程的方法: Thread newThread(Runnable r); 当线程池中需要创建线程时就会调用该方法
例子:创建线程池时重写 线程工厂 :将创建的线程设值为 守护线程
public class TestOne {
public static void main(String[] args) throws InterruptedException {
//定义任务
Runnable r=new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getId()+"线程正在执行任务....");
while (true){
//模拟任务
}
}
};
ThreadPoolExecutor poolExecutor=new ThreadPoolExecutor(5, 5, 0, TimeUnit.SECONDS, new SynchronousQueue<>(), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
//根据参数r任务,创建一个线程来执行任务
Thread thread=new Thread(r);
System.out.println("线程池创建了守护线程"+thread.getId());
//将线程设置为守护线程
thread.setDaemon(true);
return thread;
}
});
//提交 5 个任务, 当给当前线程池提交的任务超过 5 个时,线程池默认抛出异常
for (int i = 0; i < 5; i++) {
poolExecutor.execute(r);
}
//主线程睡眠4s确保线程执行任务
Thread.sleep(4000);
}
}
结果:
1.2.6 监控线程池
ThreadPoolExecutor 提供了一组方法用于监控线程池,进而更好的获得线程池的内部状态,保障系统安全
int getActiveCount() 获得线程池中当前活动线程的数量
long getCompletedTaskCount() 返回线程池完成任务的数量
int getCorePoolSize() 线程池中核心线程的数量
int getLargestPoolSize() 返回线程池曾经达到的线程的最大数
int getMaximumPoolSize() 返回线程池的最大容量
int getPoolSize() 当前线程池的大小
BlockingQueue getQueue() 返回阻塞队列 long
getTaskCount() 返回线程池收到的任务总数
例子 :使用上述方法来监控线程池
public class TestOne {
public static void main(String[] args) throws InterruptedException {
//定义任务
Runnable runnable=new Runnable() {
int i=0;
@Override
public void run() {
//模拟正在执行任务
System.out.println(" ");
}
};
//创建线程池
ThreadPoolExecutor poolExecutor=new ThreadPoolExecutor(2,5,0,TimeUnit.SECONDS,new SynchronousQueue<>(),new ThreadPoolExecutor.AbortPolicy());
//创建10个任务
for (int i=0;i<5;i++){
System.out.println("提交第"+(i+1)+"次任务===============================");
poolExecutor.execute(runnable);
System.out.println("线程池中核心线程的数量:"+poolExecutor.getCorePoolSize());
System.out.println("线程池中最大容量:"+poolExecutor.getMaximumPoolSize());
System.out.println("线程池的活动线程数量:"+poolExecutor.getActiveCount());
System.out.println("当前线程池的大小:"+poolExecutor.getPoolSize());
System.out.println("线程池玩成任务的数量:"+poolExecutor.getCompletedTaskCount());
System.out.println("线程池收到的总任务数:"+poolExecutor.getTaskCount());
Thread.sleep(2000);
}
}
}
结果:
1.2.7 优化线程池大小
线程池大小对系统性能是有一定影响的,过大或者过小都会无法发挥最优的系统性能,,线程池大小不需要非常精确,只要避免极大或 者极小的情况即可,一般来说,线程池大小需要考虑 CPU 数量,内存大小等因素。
在书中给出一个估算线程池大小的公式:
线程池大小 = CPU 的数量 * 目标 CPU 的使用率*( 1 + 等待时间 与计算时间的比)
1.2.8 线程池死锁
如果在线程池中执行的 任务 A 在执行过程中又向线程池提交了任务 B, 任务 B 添加到了线程池的等待队列中,如果任务 A 的结束需要等待任务 B 的执行结果。就有可能会出现这种情况: 线程池中所有的工作线程都处于等待任务处理结果,而这些任务在阻塞队列中等待执行, 线程池中没有可以对阻塞队列中的任务进行处理的线程,这种 等待会一直持续下去,从而造成死锁。
适合给线程池提交相互独立的任务,而不是彼此依赖的任务。对于彼此依赖的任务,可以考虑分别提交给不同的线程池来执行。
1.2.9 异常简单处理
在使用 ThreadPoolExecutor 进行 submit 提交任务时,有的任务抛出 了异常,但是线程池并没有进行提示,即线程池把任务中的异常给吃掉 了,可以把 submit 提交改为 execute执行
例子:使用线程池执行4个 数学计算任务 x/y 的结果
public class TestOne {
//定义内部任务类 计算x/y的和
static class MyRun implements Runnable{
private int x;
private int y;
private float result;
public MyRun(int x,int y){
this.x=x;
this.y=y;
}
@Override
public void run() {
result=x/y;
System.out.println("输入X="+x+" y="+y+"任务结果为: "+result);
}
}
public static void main(String[] args) throws InterruptedException {
//创建线程池
ThreadPoolExecutor poolExecutor=new ThreadPoolExecutor(2,5,0,TimeUnit.SECONDS,new LinkedBlockingDeque<>(),new ThreadPoolExecutor.AbortPolicy());
//创建4个任务
poolExecutor.submit(new MyRun(0,5));
poolExecutor.submit(new MyRun(5,1));
poolExecutor.submit(new MyRun(5,2));
poolExecutor.submit(new MyRun(5,0));
}
}
结果:
可以发现 有四个任务 但是 计算完只显示了3个结果 因为 一个 任务是 5/0这是不被允许的,出现了异常 By Zero ,但是使用了submit的提交方式 异常被隐藏了,我们看不到
这里我们只需要把 submit 改为 execute就好
结果: 成功抛出了异常
1.2.10 ForkJoinPool 线程池
“分而治之” 是一个有效的处理大数据的方法,著名的大数据基石 MapReduce 就是采用这种分而治之的思路。简单点说,如果要处理的 1000 个数据,但是我们不具备处理1000个数据的能力,可以只处理10个数据,可以把这 1000 个数据分阶段处理 100 次,每次处理 10 个,把 100 次的处理结果进行合成,形成最后这 1000 个数据的处理结果。
把一个大任务调用 fork()方法分解为若干小的任务,把小任务的处 理结果进行 join()合并为大任务的结果
系统对 ForkJoinPool 线程池进行了优化,提交的任务数量与线程的数量不一定是一对一关系。在多数情况下,一个物理线程实际上需要处理多个逻辑任务。也就是说 有的线程可能处理一个任务,有的线程可能要处理多个任务
ForkJoinPool 线程池中最常用的方法是:<T> ForkJoinTask 类中的 <T>submit(ForkJoinTask task), 向线程池提交一个 ForkJoinTask 任务。ForkJoinTask 任务支持 fork()分解与join()等待的任务。
ForkJoinTask 有 两 个 重 要 的 子 类 :RecursiveAction 和 RecursiveTask ,它们的区别在于 RecursiveAction 任务没有返回值, RecursiveTask 任务可以带有返回值
例子: 这个例子是借鉴了别人了,注解写的很详细,有需要自取
public class TestOne {
//计算数列的和, 需要返回结果,可以定义任务继承 RecursiveTask
private static class CountTask extends RecursiveTask<Long> {
private static final int THRESHOLD = 10000; //定义数据规模的阈值,允许计算10000 个数内的和,超过该阈值的数列就需要分解
private static final int TASKNUM = 100; //定义每次把大任务分解为 100 个小任务
private long start; //计算数列的起始值
private long end; //计算数列的结束值
public CountTask(long start, long end) {
this.start = start;
this.end = end;
}
//重写 RecursiveTask 类的 compute()方法,计算数列的结果
@Override
protected Long compute() {
long sum = 0; //保存计算的结果
//判断任务是否需要继续分解,如果当前数列 end 与 start 范围的数超过阈值THRESHOLD,就需要继续分解
if (end - start < THRESHOLD) {
//小于阈值可以直接计算
for (long i = start; i <= end; i++) {
sum += i;
}
} else { //数列范围超过阈值,需要继续分解
//约定每次分解成 100 个小任务,计算每个任务的计算量
long step = (start + end) / TASKNUM;
//start = 0 , end = 200000, step = 2000, 如果计算[0,200000]范围内数列的和, 把该范围的数列分解为 100 个小任务,每个任务计算 2000 个数即可
//注意,如果任务划分的层次很深,即 THRESHOLD 阈值太小,每个任务的计算量很小,层次划分就会很深,可能出现两种情况:一是系统内的线程数量会越积越多,导致性能下降严重; 二是分解次数过多,方法调用过多可能会导致栈溢出
//创建一个存储任务的集合
ArrayList<CountTask> subTaskList = new ArrayList<>();
long pos = start; //每个任务的起始位置
for (int i = 0; i < TASKNUM; i++) {
long lastOne = pos + step; //每个任务的结束位
//调整最后一个任务的结束位置
if (lastOne > end) {
lastOne = end;
}
//创建子任务
CountTask task = new CountTask(pos, lastOne);
//把任务添加到集合中
subTaskList.add(task);
//调用 for()提交子任务
task.fork();
//调整下个任务的起始位置
pos += step + 1;
}
//等待所有的子任务结束后,合并计算结果
for (CountTask task : subTaskList) {
sum += task.join(); //join()会一直等待子任务执行完毕返回执行结果
}
}
return sum;
}
}
public static void main(String[] args) {
//创建 ForkJoinPool 线程池
ForkJoinPool forkJoinPool = new ForkJoinPool();
//创建一个大的任务
CountTask task = new CountTask(0L, 200000L);
//把大任务提交给线程池
ForkJoinTask<Long> result = forkJoinPool.submit(task);
try{
Long res = result.get(); //调用任务的 get()方法返回结果
System.out.println("计算数列结果为:" + res);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
//验证
long s = 0L;
for (long i = 0; i <= 200000 ; i++) {
s += i;
}
System.out.println("检验结果为:"+s);
}
}
结果:
以上是关于Java多线程-12(线程管理中的线程池)的主要内容,如果未能解决你的问题,请参考以下文章