java线程池ThreadPoolExecutor
Posted king西阳
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了java线程池ThreadPoolExecutor相关的知识,希望对你有一定的参考价值。
使用线程池的好处:
- 创建/销毁线程伴随着系统开销,过于频繁的创建/销毁线程,会很大程度上影响处-理效率
- 线程并发数量过多,抢占系统资源从而导致阻塞
- 对线程进行一些简单的管理
1、ThreadPoolExecutor的一个构造方法:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
参数说明:
corePoolSize:当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,等到需要执行的任务数大于线程池基本大小时就不再创建。如果调用了线程池的prestartAllCoreThreads()方法,线程池会提前创建并启动所有基本线程。
maximumPoolSize:线程池中允许的最大线程数。如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。如果使用的是无界的队列,那么这个参数没有意义了。
keepAliveTime:线程池工作线程空闲后,保持的存活时间。
unit :上面时间的单位
workQueue:执行任务在执行之前加入的queue。该队列仅保存由execute方法提交的Runnable任务。
threadFactory:创建新线程的工厂方法。可以通过线程工厂给每个创建的线程设置更有意义的名字。使用开源框架guava提供的ThreadFactoryBuilder可以快速给线程池里的线程设置有意义的名字
handler:当队列满时,线程处于饱和策略。
2、工作流程
如果使用无界队列很简单,开启核心线程数,多余的全部阻塞直到内存耗尽,故不建议使用。如果使用有界队列,工作流程如下:
* 若有新的任务需要执行,如果线程池实际线程数小于corePoolSize,则优先创建线程,
* 若大于corePoolSize,则会将任务加入队列,
* 若队列已满,则在总线程数不大于maximumPoolSize的前提下,创建新的线程,
* 若线程数大于maximumPoolSize,则执行拒绝策略。或其他自定义方式。
3、handler饱和策略
官方给出的饱和策略有如下几种:
AbortPolicy:直接抛异常。
DiscardPolicy:顾名思义,直接丢弃多余的任务。
DiscardOldestPolicy:丢弃队列中最老的任务,并且重试exexute方法,并发书中说丢弃最新的一个任务,这边书中应该描述错了。
CallerRunsPolicy:在调用线程中执行丢弃的任务。
也可以自定义策略来处理,例如:
public class MyRejected implements RejectedExecutionHandler { public MyRejected(){ } //拒绝策略实际调用的方法处理丢弃的任务,可以做一些日志处理 @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { System.out.println("自定义饱和策略,当前被拒绝任务为:" + r.toString()); } }
//使用
...
ThreadPoolExecutor pool = new ThreadPoolExecutor(
1, //coreSize
200, //MaxSize
60, //60
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(3) //指定一种队列 (有界队列)
, new MyRejected()
, new DiscardOldestPolicy()
);
...
4、队列
ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按照FIFO(先进先出)原则对元素进行排序
LinkedBlockingQueue:是一个基于链表结构的有界阻塞队列,此队列按照FIFO排序元素,吞吐量高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool(n)使用了此队列
PriorityBlockingQueue:一个具有优先级的无限阻塞队列
SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等待另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量要高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool()使用了此队列
5、示例
任务线程:
public class MyTask implements Runnable { private int taskId; private String taskName; public MyTask(int taskId, String taskName){ this.taskId = taskId; this.taskName = taskName; } public int getTaskId() { return taskId; } public void setTaskId(int taskId) { this.taskId = taskId; } public String getTaskName() { return taskName; } public void setTaskName(String taskName) { this.taskName = taskName; } @Override public void run() { try { System.out.println("run taskId =" + this.taskId); Thread.sleep(5*1000); //System.out.println("end taskId =" + this.taskId); } catch (InterruptedException e) { e.printStackTrace(); } } public String toString(){ return Integer.toString(this.taskId); } }
线程池执行器:
public class UseThreadPoolExecutor1 { public static void main(String[] args) { /** * 在使用有界队列时,若有新的任务需要执行,如果线程池实际线程数小于corePoolSize,则优先创建线程, * 若大于corePoolSize,则会将任务加入队列, * 若队列已满,则在总线程数不大于maximumPoolSize的前提下,创建新的线程, * 若线程数大于maximumPoolSize,则执行拒绝策略。或其他自定义方式。 * */ ThreadPoolExecutor pool = new ThreadPoolExecutor( 1, //coreSize 2, //MaxSize 60, //60 TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(3) //指定一种队列 (有界队列) //new LinkedBlockingQueue<Runnable>() //, new MyRejected() , new DiscardOldestPolicy() ); MyTask mt1 = new MyTask(1, "任务1"); MyTask mt2 = new MyTask(2, "任务2"); MyTask mt3 = new MyTask(3, "任务3"); MyTask mt4 = new MyTask(4, "任务4"); MyTask mt5 = new MyTask(5, "任务5"); MyTask mt6 = new MyTask(6, "任务6");
//ThreadPoolExecutor.execute(Runnable command)方法即可向线程池内添加一个任务。 pool.execute(mt1); pool.execute(mt2); pool.execute(mt3); pool.execute(mt4); pool.execute(mt5); pool.execute(mt6);
//关闭线程池 pool.shutdown(); } }
执行结果:
run taskId =5
run taskId =1
run taskId =3
run taskId =4
run taskId =6
分析:
使用有界阻塞队列,先为1开启一个核心线程,然后为2 3 4入队,然后
为5开启一个线程(最大线程),本来应该是舍弃6,但是指定了舍弃策略
是舍弃最老的,也就是阻塞队列队头元素2,所以舍弃2而执行6
6、常见四种线程池
1.可缓存线程池CachedThreadPool()
源码:
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
这种线程池内部没有核心线程,线程的数量是有没限制的。在创建任务时,若有空闲的线程时则复用空闲的线程,若没有则新建线程。没有工作的线程(闲置状态)在超过了60S还不做事,就会销毁。
创建方法:
ExecutorService mCachedThreadPool = Executors.newCachedThreadPool();
使用:
private void startDownload(final ProgressBar progressBar, final int i) { mCachedThreadPool.execute(new Runnable() { @Override public void run() { int p = 0;
//每隔10s progressBar.setMax(10); while (p < 10) { p++; progressBar.setProgress(p); Bundle bundle = new Bundle(); Message message = new Message(); bundle.putInt("p", p); bundle.putString("ThreadName", Thread.currentThread().getName()); message.what = i; message.setData(bundle); mHandler.sendMessage(message); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } }); }
2.定长线程池FixedThreadPool
源码:
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
该线程池的最大线程数等于核心线程数,所以在默认情况下,该线程池的线程不会因为闲置状态超时而被销毁。
如果当前线程数小于核心线程数,并且也有闲置线程的时候提交了任务,这时也不会去复用之前的闲置线程,会创建新的线程去执行任务。如果当前执行任务数大于了核心线程数,大于的部分就会进入队列等待。等着有闲置的线程来执行这个任务。
创建方法:
//nThreads => 最大线程数即maximumPoolSize ExecutorService mFixedThreadPool= Executors.newFixedThreadPool(int nThreads);
//threadFactory => 创建线程的方法,用得少 ExecutorService mFixedThreadPool= Executors.newFixedThreadPool(int nThreads, ThreadFactory threadFactory);
使用:
private void startDownload(final ProgressBar progressBar, final int i) { mFixedThreadPool.execute(new Runnable() { @Override public void run() { //....逻辑代码自己控制 } }); }
3.单线程线程池SingleThreadPool
源码:
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
有且仅有一个工作线程执行任务,所有任务按照指定顺序执行,即遵循队列的入队出队规则
创建方法:
ExecutorService mSingleThreadPool = Executors.newSingleThreadPool();
使用同FixedThreadPool
4.定时任务线程池ScheduledThreadPool
源码:
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); } //ScheduledThreadPoolExecutor(): public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS, new DelayedWorkQueue()); }
DEFAULT_KEEPALIVE_MILLIS就是默认10L,这里就是10秒。这个线程池有点像是吧CachedThreadPool和FixedThreadPool 结合了一下。
不仅设置了核心线程数,最大线程数也是Integer.MAX_VALUE。
这个线程池是上述4个中为唯一个有延迟执行和周期执行任务的线程池。
创建方法:
//nThreads => 最大线程数即maximumPoolSize ExecutorService mScheduledThreadPool = Executors.newScheduledThreadPool(int corePoolSize);
使用:
//表示在3秒之后开始执行我们的任务。
mScheduledThreadPool.schedule(new Runnable() { @Override public void run() { //.... } }, 3, TimeUnit.SECONDS);
//延迟3秒后执行任务,从开始执行任务这个时候开始计时,每5秒执行一次不管执行任务需要多长的时间。 mScheduledThreadPool.scheduleAtFixedRate(new Runnable() { @Override public void run() { //.... } },3, 5, TimeUnit.SECONDS);
7、其他
* 执行线程次提交的任务通过execute方法和submit方法(submit提供了重载方法)。execute不返回值,而submit返回值,并且可以通过返回的Future对象的get方法查看返回值。
* 使用无界BlockingQueue:与有界队列相比,除非系统资源耗尽,否则无界的队列不存在任务入队失败的情况。当有新任务到来,系统的线程小于corePoolSize时,则新建线程执行任务。当达到corePoolSize后,就不会继续增加。若后续仍有新的队列加入,而没有空闲的线程资源,则队列直接进入队列等待。若任务创建和处理的速度差异很大,无界队列会保持快速增长,直到耗尽系统内存。所以一般不建议使用无界队列,另外使用无界队列的时候,最大线程数,拒绝策略等都失去了意义
*阿里编译器建议线程池创建时最好能手动进行创建,给出理由:
newFixedThreadPoolExecutor和newSingleThreadPoolExecutor:主要问题是堆积请求的处理队列可能会消耗非常大的内存,甚至OOM
以上是关于java线程池ThreadPoolExecutor的主要内容,如果未能解决你的问题,请参考以下文章
Java多线程系列——线程池原理之 ThreadPoolExecutor
Java - "JUC线程池" ThreadPoolExecutor原理解析
Java常用四大线程池用法以及ThreadPoolExecutor详解