Java线程池:ThreadPoolExecutor
Posted 安然_随心
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java线程池:ThreadPoolExecutor相关的知识,希望对你有一定的参考价值。
以下基于JDK 1.8 进行分析
文章目录
1 简介
实现了AbstractExecutorService 抽象类,提供了创建线程池、设置线程池属性、提交任务、停止线程池等基本操作接口。下面操作类型对 ThreadPoolExecutor 进行分析。
2. 创建线程池
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
创建线程池对象,可根据实际需要,自定义线程池特性。初始化过程,没有任何特殊化操作,只是一些基础变量的初始化。
创建完成后,也没有什么特殊的效果,刚初始化的线程池对象中不包含任何线程。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
备注:如果觉得使用该自定义方式创建线程池比较复杂繁琐(需要传入众多的参数),则可以使用 Executors 工具类来创建固定属性的线程池,包括:
-
newSingleThreadExecutor :创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
-
newFixedThreadPool :创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
-
newScheduledThreadPool :创建一个可定期或者延时执行任务的定长线程池,支持定时及周期性任务执行。
-
newCachedThreadPool :创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
3. 线程池运行
在创建线程池后,线程池就启动了(没有类似的start 函数,创建即启动)。但是里面没有任何线程。启动线程运行任务有两种方式:
3.1 预启动核心线程
3.1.1 prestartCoreThread
public boolean prestartCoreThread()public boolean prestartCoreThread()
return workerCountOf(ctl.get()) < corePoolSize &&
addWorker(null, true);
如果当前线程池中的数量 小于 核心线程数,则创建一个线程,加入到线程池中,然后调用线程的start 方法,启动线程。
3.1.2 prestartAllCoreThreads
启动所有的核心线程。循环调用prestartCoreThread,直到
prestartCoreThread 调用不成功(已经达到核心线程数 或者 线程池已经关闭了)。
3.2 提交任务的方式
提交任务 和 预启动的区别就是:预启动 核心线程的时候,由于没有任务,则核心线程是处于空转的状态。并且,预启动只能启动核心线程数,当到达 核心线程数,则是没有作用的。
3.2.1 execute
public void execute(Runnable command)
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
// 如果当前线程数 小于 核心线程数,则启动一个新的线程,运行该任务
if (workerCountOf(c) < corePoolSize)
if (addWorker(command, true))
return;
c = ctl.get();
if (isRunning(c) && workQueue.offer(command))
int recheck = ctl.get();
//如果线程池在关闭,则将任务从任务队列中删除,并执行拒绝任务流程;
if (! isRunning(recheck) && remove(command))
reject(command);
//再次判定,在这个时候,是否之前的线程死了,则新启动一个线程(避免有任务,无工作线程的情况)
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
//其他case: 线程池处于运行状态,且当前线程数大于核心线程数,且任务队列没有满,
//则将任务加入到任务队列中
//线程池处于运行状态,且当前线程数大于核心线程数,且任务队列满了,则尝试新启动一个线程
//如果启动失败,则拒绝任务
else if (!addWorker(command, false))
reject(command);
3.2.2 submit 系列方法
一般形式的submit 方法如下:
public Future<?> submit(Runnable task)
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
实际上,submit 方法可接收 runnable, 也可接收 callable 对象。
submit 系列方法核心 要点还是调用了execute ,只是封装了一层,可以返回future 对象,可查询任务状态、结果。
3.2.3 其他说明
创建先的工作线程可能失败的case:
- 达到线程池限制(预启动时,是到达 核心线程数;其他情况,达到最大线程数限制);
- 线程池已经停止了了,或者在 缓慢 shutdown 过程中。
- 设置的线程池工厂 创建线程错误,例如系统内存溢出等情况;
3.3 工作线程
工作线程的定义:
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
final Thread thread;
Runnable firstTask;
。。。。
3.3.1 工作线程任务执行过程
工作线程继承了runnable 接口,直接看接口的run 函数:
public void run()
runWorker(this);
final void runWorker(Worker w)
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try
//如果初始任务不为空 或者能取到任务(任务队列不为空)
while (task != null || (task = getTask()) != null)
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try
//默认实现是空的,可自定义为记录日志等其他功能
beforeExecute(wt, task);
Throwable thrown = null;
try
//调用任务的run函数,执行任务
task.run();
catch (RuntimeException x)
thrown = x; throw x;
catch (Error x)
thrown = x; throw x;
catch (Throwable x)
thrown = x; throw new Error(x);
finally
//空函数
afterExecute(task, thrown);
finally
task = null;
w.completedTasks++;
w.unlock();
completedAbruptly = false;
finally
//退出了主循环,该线程结束。会进行任务统计,且如果线程数达不到要求,会重新启动一个工作线程。
processWorkerExit(w, completedAbruptly);
线程池中的任务由工作线程完成。工作线程的运行逻辑:
- 运行初始任务(如果有的话。使用预启动函数创建的工作线程没有初始任务);
- 不停的从任务队列中取任务,调用任务的run函数,执行任务;
3.3.2 工作线程什么时候退出、运行结束
从上面代码中可以看到,工作线程一般情况下,是不会退出的,退出工作循环只有一个条件:执行抛出了异常。抛出异常又可分为两种情况:
- 当线程池关闭时,任务会收到中断 异常;
- 本身任务抛出异常;
4 结束线程池
ThreadPoolExecutor提供了两个方法,用于线程池的关闭,分别是shutdown()和shutdownNow(),其中:
4.1 shutdown()
不会立即终止线程池,而是要等所有任务缓存队列中的任务都执行完后才终止,但再也不会接受新的任务
public void shutdown()
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try
checkShutdownAccess();
//设置线程池的状态为关闭,则不会接受新的任务
advanceRunState(SHUTDOWN);
//中断那些空闲的进程
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
finally
mainLock.unlock();
tryTerminate();
4.2 shutdownNow()
立即终止线程池,并尝试打断正在执行的任务,并且清空任务缓存队列,返回尚未执行的任务
5. 问题
5.1 工作线程
线程池中的工作线程 被放入到 一个哈HashSet中。当创建新的线程或者线程死亡时,通过锁 来修改 workers 集合。
private final HashSet<Worker> workers = new HashSet<Worker>();
private final ReentrantLock mainLock = new ReentrantLock();
那为什么不直接使用线程安全的集合?
* Lock held on access to workers set and related bookkeeping.
* While we could use a concurrent set of some sort, it turns out
* to be generally preferable to use a lock. Among the reasons is
* that this serializes interruptIdleWorkers, which avoids
* unnecessary interrupt storms, especially during shutdown.
* Otherwise exiting threads would concurrently interrupt those
* that have not yet interrupted. It also simplifies some of the
* associated statistics bookkeeping of largestPoolSize etc. We
* also hold mainLock on shutdown and shutdownNow, for the sake of
* ensuring workers set is stable while separately checking
* permission to interrupt and actually interrupting.
没有看懂文档上的说明
5.2 在停止线程池时,怎么区分那些工作线程是空闲的?
所谓忙碌的工作线程,即取到任务(从任务队列获取到任务 或者 有初始任务),准备执行的工作线程。
当工作线程有了任务后,会给自己上锁:
final void runWorker(Worker w)
.......
while (task != null || (task = getTask()) != null)
// 上锁
w.lock();
。。。。。
worker 继承了AbstractQueuedSynchronizer,当执行worker.lock 后,如果不执行unlock ,则其他地方是获取不到锁的。当worker 执行完取到的单个任务后,会调用 unlock 函数。
当调用shutdown 函数停止 空闲工作线程的时候,会尝试调用worker 的 trylock 函数。如上所说,如果工作线程取到了任务,则会进行lock 操作,即此时 shutdown 中,对该工作程序执行trylock 不成功,则不会发送interrupt 消息。
for (Worker w : workers)
Thread t = w.thread;
// 执在执行任务的工作线程 trylock或失败
if (!t.isInterrupted() && w.tryLock())
try
t.interrupt();
catch (SecurityException ignore)
finally
w.unlock();
if (onlyOne)
break;
以上是关于Java线程池:ThreadPoolExecutor的主要内容,如果未能解决你的问题,请参考以下文章
Android线程管理之ThreadPoolExecutor自定义线程池