多线程(二十二异步执行-Futrue模式)
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了多线程(二十二异步执行-Futrue模式)相关的知识,希望对你有一定的参考价值。
Future简介如果一个任务需要返回执行结果,一般我们会实现一个Callable任务,并创建一个线程来执行任务。对于执行时间比较长的任务,显然我们同步的等待结果再去执行后续的业务是不现实的,那么,Future模式是怎样解决这个问题的呢?
Future模式,可以让调用方立即返回,然后它自己会在后面慢慢处理,此时调用者拿到的仅仅是一个凭证,调用者可以先去处理其它任务,在真正需要用到调用结果的场合,再使用凭证去获取调用结果。这个凭证就是这里的Future。
Future接口的定义:
public interface Future<V>
// 取消任务
boolean cancel(boolean mayInterruptIfRunning);
// 任务是否取消
boolean isCancelled();
// 标记任务是否执行完成
boolean isDone();
// 阻塞获取任务结果
V get() throws InterruptedException, ExecutionException;
// 超时获取任务结果
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
FutureTask
Future模式中,最重要的就是FutureTask类
FutureTask一共给任务定义了7种状态
1、NEW:表示任务的初始化状态;
2、COMPLETING:表示任务已执行完成(正常完成或异常完成),但任务结果或异常原因还未设置完成,属于中间状态;
3、NORMAL:表示任务已经执行完成(正常完成),且任务结果已设置完成,属于最终状态;
4、EXCEPTIONAL:表示任务已经执行完成(异常完成),且任务异常已设置完成,属于最终状态;
5、CANCELLED:表示任务还没开始执行就被取消(非中断方式),属于最终状态;
6、INTERRUPTING:表示任务还没开始执行就被取消(中断方式),正式被中断前的过渡状态,属于中间状态;
7、INTERRUPTED:表示任务还没开始执行就被取消(中断方式),且已被中断,属于最终状态。
各个状态之间的流转:
FutureTask构造
FutureTask在构造时可以接受Runnable或Callable任务,如果是Runnable,则最终包装成Callable:
public FutureTask(Callable<V> callable)
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
public FutureTask(Runnable runnable, V result)
// 包装Runnable成为Callable
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
FutureTask成员
private volatile int state;//任务状态
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
private Callable<V> callable; // 真正的任务
private volatile Thread runner; // 保存正在执行任务的线程
/**
* 记录结果或异常
*/
private Object outcome;
/**
* 无锁栈(Treiber stack)
* 保存等待线程
*/
private volatile WaitNode waiters;
当调用FutureTask的get方法时,如果任务没有完成,则调用线程会被阻塞,其实就是将线程包装成WaitNode结点保存到waiters指向的栈中。
static final class WaitNode
volatile Thread thread;
volatile WaitNode next;
WaitNode() thread = Thread.currentThread();
任务执行run
public void run()
// 仅当任务为NEW状态时, 才能执行任务
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
return;
try
Callable<V> c = callable;
if (c != null && state == NEW)
V result;
boolean ran;
try
//执行任务
result = c.call();
ran = true;
catch (Throwable ex)
result = null;
ran = false;
//设置异常
setException(ex);
if (ran)
//设置任务执行结果outcome
set(result);
finally
runner = null;
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
set方法:
protected void set(V v)
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING))
outcome = v;//存储结果值
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
任务取消
public boolean cancel(boolean mayInterruptIfRunning)
// 仅NEW状态下可以取消任务
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try
if (mayInterruptIfRunning) // 中断任务
try
Thread t = runner;
if (t != null)
t.interrupt();
finally // final state
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
finally
//释放所有在栈上等待的线程
finishCompletion();
return true;
任务取消后,最终调用finishCompletion方法,释放所有在栈上等待的线程
private void finishCompletion()
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;)
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null))
for (;;) //自旋释放所有等待线程
Thread t = q.thread;
if (t != null)
q.thread = null;
LockSupport.unpark(t);//唤醒线程
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
break;
done();
callable = null; // to reduce footprint
获取结果
FutureTask可以通过get方法获取任务结果,如果需要限时等待,可以调用get(long timeout, TimeUnit unit)
public V get() throws InterruptedException, ExecutionException
int s = state;
//当前任务的状态是NEW或COMPLETING,会调用awaitDone阻塞线程
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s); // 任务执行结果
/**
* 返回执行结果.
*/
private V report(int s) throws ExecutionException
Object x = outcome;
if (s == NORMAL)
return (V) x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable) x);
ScheduledFutureTask
1、ScheduledFutureTask在普通FutureTask的基础上增加了周期执行/延迟执行的功能
2、ScheduledFutureTask是ScheduledThreadPoolExecutor这个线程池的默认调度任务类,通过继承FutureTask和Delayed接口来实现周期/延迟功能的。
ScheduledFutureTask的源码非常简单,基本都是委托FutureTask来实现的
任务运行
public void run()
// 是否是周期任务
boolean periodic = isPeriodic();
//// 能否运行任务
if (!canRunInCurrentRunState(periodic))
cancel(false);
else if (!periodic) // 非周期任务:调用FutureTask的run方法运行
ScheduledFutureTask.super.run();
// 周期任务:调用FutureTask的runAndReset方法运行
else if (ScheduledFutureTask.super.runAndReset())
setNextRunTime();
reExecutePeriodic(outerTask);
FutureTask的runAndReset方法与run方法的区别就是当任务正常执行完成后,不会设置任务的最终状态(即保持NEW状态),以便任务重复执行:
protected boolean runAndReset()
// 仅NEW状态的任务可以执行
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return false;
boolean ran = false;
int s = state;
try
Callable<V> c = callable;
if (c != null && s == NEW)
try
c.call(); //不设置执行结果
ran = true;
catch (Throwable ex)
setException(ex);
finally
runner = null;
s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
return ran && s == NEW;//重新设置任务状态为NEW,继续重复执行
以上是关于多线程(二十二异步执行-Futrue模式)的主要内容,如果未能解决你的问题,请参考以下文章
java多线程基本概述(二十二)——CountDownLatch(2017-04-20 18:54)