线程池那些事之Future
Posted Java后端笔记
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了线程池那些事之Future相关的知识,希望对你有一定的参考价值。
前言
ThreadPoolExecutor除了可以执行Runnable的任务外,还可以执行那些带有返回结果的Callable任务。在提交Callable任务后,我们会得到一个Future对象。使用这个Future对象,我们可以监控任务的执行状态,也可以取消任务的执行。
源码分析
任务的提交
ThreadPoolExecutor的submit方法除了可以提交Callable的任务,也可以提交Runnable的任务,但是在底层都会转换为Callable,只不过提交Runnable任务在Future会返回空或者我们预设的值。
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
在submit方法中,执行的逻辑为:
通过入参生成一个RunnableFuture对象
通过RunnableFuture调用execute方法
把这个RunnableFuture对象返回。
这边我们需要注意的是,对于execute方法来讲,它只关注RunnableFuture的run方法逻辑体,那么监控是如何做的呢,看下面的FutureTask解析。
FutureTask
从类图中可以看到,FutureTask实现了RunnbaleFuture接口,而RunnbaleFuture继承了Runnable和Future接口,FutureTask针对这两个的接口实现,有各自的作用。
我们先看下Future接口定义
public interface Future<V> {
//如果任务还未执行,取消任务执行
//mayInterruptIfRunning为true,会中断正在执行任务的线程
boolean cancel(boolean mayInterruptIfRunning);
//任务是否已经被取消
boolean isCancelled();
//任务是否完成
boolean isDone();
//获取任务执行结果
V get() throws InterruptedException, ExecutionException;
//超时获取任务结果,超过时间,抛出TimeoutException
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
Future接口定义了一些监控的方法,讲解FutureTask如何实现Future的接口之前,先了解下FutureTask的生命周期
FutureTask生命周期
FutureTask有7个状态
//初始化
private static final int NEW = 0;
//任务执行中
private static final int COMPLETING = 1;
//正常结束
private static final int NORMAL = 2;
//异常结束
private static final int EXCEPTIONAL = 3;
//取消
private static final int CANCELLED = 4;
//中断取消进行中
private static final int INTERRUPTING = 5;
//中断取消完成
private static final int INTERRUPTED = 6;
状态转换有以下4种情况
正常执行 NEW->COMPLETING->NORMAL
执行失败 NEW->COMPLETING->EXCEPTIONAL
取消 NEW->CANCELLED
中断取消 NEW->INTERRUPTING->INTERRUPTED
生命周期状态转换的逻辑都在FutureTask的run方法中
Runnable接口实现
public void run() {
//如果已经被执行过或者被取消了,直接return
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
//执行call方法
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
//执行失败,抛出异常,设置异常
setException(ex);
}
if (ran)
//执行成功设置结果
set(result);
}
} finally {
runner = null;
int s = state;
//对中断的一些处理
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
逻辑很简单,成功调用set(result)设置结果,发生异常调用 setException(ex);
下面来看下set和setException方法
protected void set(V v) {
//修改状态为COMPLETING
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
//设置结果到outcome
outcome = v;
//修改状态为NORMAL
UNSAFE.putOrderedInt(this, stateOffset, NORMAL);
//唤醒等待的get方法
finishCompletion();
}
}
protected void setException(Throwable t) {
//修改状态为COMPLETING
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
//设置异常结果到outcome
outcome = t;
//修改状态为EXCEPTIONAL
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL);
//唤醒等待的get方法
finishCompletion();
}
}
两个方法的逻辑几乎一样,除了最后的结果和state不同。
在finishCompletion会唤醒等待结果的线程
private void finishCompletion() {
// 遍历waiters进行唤醒
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
//通过LockSupport唤醒线程
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
done();
callable = null; // to reduce footprint
}
run方法中主要封装了对提交任务的执行以及FutureTask生命周期的转变逻辑,接下来看下Future接口的实现。
也就是在线程池中,工作线程会执行以上逻辑。
Futute接口实现
Future接口的方法调用者,不是工作线程,而是我们自己的业务线程了。
get
public V get() throws InterruptedException, ExecutionException {
int s = state;
//状态小于COMPLETING的时候代表任务没有完成,需要阻塞等待
if (s <= COMPLETING)
//阻塞当前线程
s = awaitDone(false, 0L);
return report(s);
}
private V report(int s) throws ExecutionException {
Object x = outcome;
//如果状态为NORMAL,表示调用成功,返回结果
if (s == NORMAL)
return (V)x;
//表示被取消,抛出CancellationException异常
if (s >= CANCELLED)
throw new CancellationException();
//下面的话,代表执行过程中抛出其他异常
throw new ExecutionException((Throwable)x);
}
我们需要注意如果任务没有执行完毕,在get方法中会阻塞当前线程进行等待执行结果。当然get也支持超时模式。
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
//下面通过awaitDone会传入超时时间
if (s <= COMPLETING &&
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
//如果超时后,状态还是小于COMPLETING,抛出异常
throw new TimeoutException();
return report(s);
}
如果在指定时间内没有返回,抛出TimeoutException异常。
在awaitDone中,会把当前线程阻塞,以链表节点的形式放入waiters中。
cancel
cancel用于取消任务,有普通取消和中断取消两种,通过mayInterruptIfRunning区分。
public boolean cancel(boolean mayInterruptIfRunning) {
//只能在任务开始运行前取消
//并且只能对NEW状态的线程进行中断
//如果不进行中断设置为CANCELLED状态,否则,INTERRUPTING状态
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
//对线程进行中断的逻辑
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
//中断过后,设置为INTERRUPTED状态
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
//唤醒结果的线程
finishCompletion();
}
return true;
}
对于中断取消,会调用线程的interrupt对线程进行中断。
这里有个知识点,线程中断只对阻塞的线程有效,如果阻塞的线程被中断了,会抛出中断异常。线程对于不阻塞的线程无效。 理论上取消需要在任务未执行前,那么我们也没必要对线程进行中断。但是从以下代码来看,在多线程情况下,即使状态为NEW,你取消了,任务还是有可能被执行。如果这个被执行任务内部有阻塞的逻辑,那么对应工作线程会释放不了,所以需要进行中断。
//run方法第一个判断
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
//cancel方法的判断
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
isCancelled和isDone
这两个方法比较简单,直接使用状态来判断。
public boolean isCancelled() {
return state >= CANCELLED;
}
public boolean isDone() {
return state != NEW;
}
一些测试
超时测试
Future future = executorService.submit(new Callable<Object>() {
@Override
public Object call() throws Exception {
try {
Thread.sleep(2000);
}catch (Exception ex){
ex.printStackTrace();
}
return "hello world";
}
});
System.out.println(future.isDone());
System.out.println(future.get(1000,TimeUnit.MILLISECONDS));
System.out.println(future.isDone());
会抛出超时异常
Exception in thread "main" java.util.concurrent.TimeoutException
中断测试
Future future = executorService.submit(new Callable<Object>() {
@Override
public Object call() throws Exception {
try {
Thread.sleep(2000);
}catch (Exception ex){
ex.printStackTrace();
}
return "hello world";
}
});
Thread.sleep(1000L);
System.out.println(future.isDone());
future.cancel(true);
System.out.println(future.get());
System.out.println(future.isDone());
返回结果为
false
Exception in thread "main" java.lang.InterruptedException: sleep interrupted
java.util.concurrent.CancellationException
抛出了2个异常,一个是中断异常,因为线程在sleep下面阻塞,如果没有阻塞不会有这个异常,第二个是get的时候抛出取消异常。
下面模拟下对于已经执行的任务,取消无效的例子
Future future = executorService.submit(new Callable<Object>() {
@Override
public Object call() throws Exception {
int i =0;
while(i>=0){
i++;
if(i%1000000==0){
System.out.println(i);
}
}
return "hello world";
}
});
//Thread.sleep(1000L);
System.out.println(future.isDone());
System.out.println(future.cancel(true));
System.out.println(future.get());
System.out.println(future.isDone());
结果的开始如下
false
Exception in thread "main" java.util.concurrent.CancellationException
true
1000000
at java.util.concurrent.FutureTask.report(FutureTask.java:121)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at com.scj.threadpool.BasicUse.main(BasicUse.java:57)
2000000
3000000
4000000
从第二个true看出来,我们cancel是成功了,但是这个线程一直在跑,所以对于这种无线循环的任务我们也是需要注意的,即使取消了,也没有用。只能把整个线程池销毁了吧。
总结
对于ThreadPoolExecutor来讲,FutureTask和Runnable毫无差别。FutureTask对Runnable任务进行了包装,在run方法体整合了Callable的调用,FutureTask状态转换以及调用结果设置的逻辑,同时可以通过Future接口的方法提供监控操作。
以上是关于线程池那些事之Future的主要内容,如果未能解决你的问题,请参考以下文章
线程池那些事之ScheduledThreadPoolExecutor
newCacheThreadPool()newFixedThreadPool()newScheduledThreadPool()newSingleThreadExecutor()自定义线程池(代码片段