提升--16---线程池--02---线程池7大参数Executors工具类
Posted 高高for 循环
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了提升--16---线程池--02---线程池7大参数Executors工具类相关的知识,希望对你有一定的参考价值。
文章目录
线程池
多线程–08–线程池
两种类型
线程池呢从目前JDK提供的有两种类型,第一种就是普通的线程池ThreadPoolExecutor,第二种是ForkJoinPool,这两种是不同类型的线程池,能干的事儿不太一样,大家先把结论记住。Fork分叉,分叉完再分叉,最后的结果汇总这叫join。
- ThreadPoolExecutor
- ForkJoinPool
ThreadPoolExecutor
ThreadPoolExecutor他的父类是从AbstractExecutorService,而AbstractExecutorService的父类是ExecutorService,再ExecutorService的父类是Executo,
所以ThreadPoolExecutor就相当于线程池的执行器,就是大家伙儿可以向这个池子里面扔任务,让这个线程池去运行。另外在阿里巴巴的手册里面要求线程池是要自定义的,还有不少同学会被问这个线程池是怎么自定义。
线程池他维护这两个集合,第一个是线程的集合,里面是一个一个的线程。第二个是任务的队列,里面是一个一个的任务。这叫一个完整的线程池。
定义线程池,七个参数
一、corePoolSize(必需) 线程池核心线程大小
线程池中会维护一个最小的线程数量,即使这些线程处理空闲状态,他们也不会 被销毁,除非设置了allowCoreThreadTimeOut为true时。核心线程也会超时回收。
二、maximumPoolSize (必需)线程池最大线程数量
线程池所能容纳的最大线程数。当活跃线程数达到该数值后,后续的新任务将会阻塞。
三、keepAliveTime (必需) 空闲线程存活时间
线程闲置超时时长。如果超过该时长,非核心线程就会被回收。如果将allowCoreThreadTimeout设置为true时,核心线程也会超时回收。
四、unit (必需) 空闲线程存活时间单位
指定keepAliveTime参数的时间单位。常用的有:TimeUnit.MILLISECONDS(毫秒)、TimeUnit.SECONDS(秒)、TimeUnit.MINUTES(分)。
五、workQueue (必需)工作队列
新任务被提交后,会先进入到此工作队列中,任务调度时再从队列中取出任务.
任务队列。通过线程池的execute()方法提交的Runnable对象将存储在该参数中。其采用阻塞队列实现。
六、threadFactory (可选) 线程工厂
线程工厂。用于指定为线程池创建新线程的方式
DefaultThreadFactory—默认线程工厂
他返回的是一个new DefaultThreadFactory,它要去你去实现ThreadFactory的接口,这个接口只有一个方法叫newThread,所以就是产生线程的,可以通过这种方式产生自定义的线程,默认产生的是defaultThreadFactory,而defaultThreadFactory产生线程的时候有几个特点:
- new出来的时候指定了group制定了线程名字,
- 然后指定的这个线程绝对不是守护线程
- 设定好你线程的优先级。
自己可以定义产生的到底是什么样的线程,指定线程名叫什么(为什么要指定线程名称,有什么意义,就是可以方便出错是回溯);
ThreadPoolExecutor默认策略
七、handler (可选) 拒绝策略
拒绝策略。当达到最大线程数时需要执行的饱和策略。
当工作队列中的任务已到达最大限制,并且线程池中的线程数量也达到最大限制,这时如果有新任务提交进来,该如何处理呢。这就是拒绝策略
第七个叫拒绝策略,指的是线程池忙,而且任务队列满这种情况下我们就要执行各种各样的拒绝策略,jdk默认提供了四种拒绝策略,也是可以自定义的。
- Abort:抛异常-------Executors默认策略 ThreadPoolExecutor默认策略
- Discard:扔掉,不抛异常
- DiscardOldest:扔掉排队时间最久的
- CallerRuns:调用者处理服务
自定义策略
一般情况这四种我们会自定义策略,去实现这个拒绝策略的接口,处理的方式是一般我们的消息需要保存下来,要是订单的话那就更需要保存了,随然后做好日志.
- 保存到kafka
- 保存到redis
- 保存到数据库
自定义一个拒绝策略的例子,代码演示如下:
import java.util.concurrent.*;
public class T14_MyRejectedHandler {
public static void main(String[] args) {
ExecutorService service = new ThreadPoolExecutor(4, 4,
0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(6),
Executors.defaultThreadFactory(),
new MyHandler());
}
static class MyHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
//log("r rejected")
//save r kafka mysql redis
//try 3 times
if(executor.getQueue().size() < 10000) {
//try put again();
}
}
}
}
线程池工作流程
假设设置一个线程池,核心线程数为2,最大线程数为4,有界队列大小为4.
ThreadPoolExecutor tpe = new ThreadPoolExecutor(2, 4,
60, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(4),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.CallerRunsPolicy());
- new ThreadPoolExecutor(),刚开始创建时,里面是没有线程数的.
- 来了一个任务,线程池启动一个线程来执行(假设任务一直阻塞,没有执行完成)
- 又来了一个任务,线程池又启动了一个线程来执行
- 当第3个任务来时,因为核心线程为2,已经满了,所以会把此任务放到队列去排队.
- 当队列里面有4个任务时,队列已满,再来新任务时,线程池会启动第3个线程来处理此任务
- 当线程池起了4个线程数(最大线程数),且队列里面也排满了4个任务(队列最大容量)
- 此时再来任务,就会执行拒绝策略
案例:
package c_026_01_ThreadPool;
import java.io.IOException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class T05_00_HelloThreadPool {
static class Task implements Runnable {
private int i;
public Task(int i) {
this.i = i;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " Task " + i);
try {
System.in.read();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public String toString() {
return "Task{" +
"i=" + i +
'}';
}
}
public static void main(String[] args) {
; ThreadPoolExecutor tpe = new ThreadPoolExecutor(2, 4,
60, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(4),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.DiscardOldestPolicy());
for (int i = 0; i < 8; i++) {
tpe.execute(new Task(i));
}
System.out.println(tpe.getQueue());
tpe.execute(new Task(100));
System.out.println(tpe.getQueue());
tpe.shutdown();
}
}
拒绝策略 为 : DiscardOldestPolicy:扔掉排队时间最久的,所以把task2,扔掉了.
生产中我们一般是使用 自定义策略
一般情况这四种我们会自定义策略,去实现这个拒绝策略的接口,处理的方式是一般我们的消息需要保存下来,要是订单的话那就更需要保存了,随然后做好日志.
- 保存到kafka
- 保存到redis
- 保存到数据库
Executors工具类
Executors类提供了4种不同的线程池:
- 定长线程池(FixedThreadPool)
- 定时线程池(ScheduledThreadPool )
- 可缓存线程池(CachedThreadPool)
- 单线程化线程池(SingleThreadExecutor)
1. 单线程化线程池(SingleThreadPool)
源码:
Executors
ThreadPoolExecutor
这个线程池里面只有一个线程,这个一个线程的线程池可以保证我们扔进去的任务是顺序执行的。
- 特点:只有1个核心线程,无非核心线程,执行完立即回收,任务队列为链表结构的有界队列。
- 应用场景:不适合并发但可能引起IO阻塞性及影响UI线程响应的操作,如数据库操作、文件操作等。
案例:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class T07_SingleThreadPool {
public static void main(String[] args) {
ExecutorService service = Executors.newSingleThreadExecutor();
for(int i=0; i<5; i++) {
final int j = i;
service.execute(()->{
System.out.println(j + " " + Thread.currentThread().getName());
});
}
}
}
2. 可缓存线程池(CachedThreadPool)
源码:
看他的源码实际上是new了一个ThreadPoolExecutor,他没有核心线程,最大线程可以有好多好多线程int最大值( Integer.MAX_VALUE),然后60秒钟没有人理他,就回收了,他的任务队列用的是SynchronousQueue,没有指定他的线程工厂他是用的默认线程工厂的,也没有指定拒绝策略,他是默认拒绝策略的。
我们能够看出CachedThreadPool的特点,就是你来一个任务我给你启动一个线程,当然前提是我的线程池里面有线程存在而且他还没有到达60秒钟的回收时间的时候,来一个任务,如果有线程存在我就用现有的线程池,但是在有新的任务来的时候,如果其他线程忙我就启动一个新的,哪有同学说我不是来任务就扔到任务队列里面吗,可是大家分析一下我们这个CachedThreadPool他用的任务队列是synchronousQueue,它是一个手递手容量为空的Queue,就是你来一个东西必须得有一个线程把他拿走,不然我提交任务的线程从这阻塞住了。
synchronousQueue还可以扩展为多个线程的手递手,多个生产者多个消费者都需要手递手叫TransferQueue。这个CachedThreadPool就是这样一个线程池,来一个新的任务就必须马上执行,没有线程空着我就new一个线程。那么阿里是不会推荐使用这中线程池的,原因是线程会启动的特别多,基本接近于没有上限的。
- 特点:无核心线程,非核心线程数量无限,执行完闲置60s后回收,任务队列为不存储元素的阻塞队列。
- 应用场景:执行大量、耗时少的任务。
案例
来看这个小程序,首先将这个service打印出来,最后在把service打印出来,我们的任务是睡500个毫秒,然后打印线程池,打印他的名字。运行一下,通过打印线程池的toString的输出能看到线程池的一些状态。
package c_026_01_ThreadPool;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class T08_CachedPool {
public static void main(String[] args) throws InterruptedException {
ExecutorService service = Executors.newCachedThreadPool();
System.out.println(service);
for (int i = 0; i < 2; i++) {
service.execute(() -> {
try {
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName());
});
}
System.out.println(service);
TimeUnit.SECONDS.sleep(80);
System.out.println(service);
}
}
3. 定长线程池(FixedThreadPool)
你看他的名称,fixed是固定的含义,就是固定的一个线程数,FixedThreadPool指定一个参数,到底有多少个线程,你看他的核心线程和最大线程都是固定的,因为他的最大线程和核心线程都是固定的就没有回收之说所以把他指定成0,这里用的是LinkedBlockingQueue(如果在阿里工作看到LinkedBlockingQueue一定要小心,他是不建议用的)
源码:
并行和并发有什么区别 ? concurrent vs parallel:
- 并发是指任务提交,并行指任务执行;并行是并发的子集。并行是多个cpu可以同时进行处理,并发是多个任务同时过来。要理解这个概念。
FixedThreadPool是确实可以让你的任务来并行处理的
案例:
- 看这个方法isPrime判断一个数是不是质数,然后写了另外一个getPrime方法,指定一个其实的位置,一个结束的位置将中间的质数拿出来一部分,主要是为了把任务给切分开。计算从1一直到200000这么一些数里面有多少个数是质数getPrime,计算了一下时间,只有我们一个main线程来运行,不过我们既然学了多线程就完全可以这个任务切分成好多好多子任务让多线程来共同运行,我有多少cpu,我的机器是4核的,这个取决你的机器数,在启动了一个固定大小的线程池,然后在分别来计算,分别把不同的阶段交给不同的任务,扔进去submit他是异步的,拿到get的时候才知道里面到底有多少个,全部get完了之后相当于所有的线程都知道结果了,最后我们计算一下时间,用这两种计算方式就能比较出来到底是并行的方式快还是串行的方式快。
package c_026_01_ThreadPool;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
public class T09_FixedThreadPool {
public static void main(String[] args) throws InterruptedException, ExecutionException {
long start = System.currentTimeMillis();
getPrime(1, 200000);
long end = System.currentTimeMillis();
System.out.println("单线程执行: "+(end - start));
final int cpuCoreNum = 4;
ExecutorService service = Executors.newFixedThreadPool(cpuCoreNum);
MyTask t1 = new MyTask(1, 80000); //1-5 5-10 10-15 15-20
MyTask t2 = new MyTask(80001, 130000);
MyTask t3 = new MyTask(130001, 170000);
MyTask t4 = new MyTask(170001, 200000);
Future<List<Integer>> f1 = service.submit(t1);
Future<List<Integer>> f2 = service.submit(t2);
Future<List<Integer>> f3 = service.submit(t3);
Future<List<Integer>> f4 = service.submit(t4);
start = System.currentTimeMillis();
f1.get();
f2.get();
f3.get();
f4.get();
end = System.currentTimeMillis();
System.out.println("多线程并行执行: "+(end - start));
}
static class MyTask implements Callable<List<Integer>> {
int startPos, endPos;
MyTask(int s, int e) {
this.startPos = s;
this.endPos = e;
}
@Override
public List<Integer> call() throws Exception {
List<Integer> r = getPrime(startPos, endPos);
return r;
}
}
//判断一个数,是否被所有的偶数整除
static boolean isPrime(int num) {
for(int i=2; i<=num/2; i++) {
if(num % i == 0) return false;
}
return true;
}
static List<Integer> getPrime(int start, int end) {
List<Integer> results = new ArrayList<>();
for(int i=start; i<=end; i++) {
if(isPrime(i)) results.add(i);
}
return results;
}
}
什么时候用Cache什么时候用Fixed
- 假如你这个任务并不确定他的量平稳与否,就像是任务来的时候他可能忽高忽低,但是我
以上是关于提升--16---线程池--02---线程池7大参数Executors工具类的主要内容,如果未能解决你的问题,请参考以下文章
JavaLearn#(16)多线程提升训练:生产者和消费者问题Lock锁ReadWriteLockBlockingQueuevolatile线程池线程同步练习