FutureTask机制

Posted walker993

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了FutureTask机制相关的知识,希望对你有一定的参考价值。

1. 继承体系

  FutureTask实现了Runnable接口,实现了run() 方法,可以交给线程/线程池执行。实现了Future接口,实现了get()方法,可以获取执行的结果。

2. 重要属性

/**
 * Possible state transitions:
 * NEW -> COMPLETING -> NORMAL
 * NEW -> COMPLETING -> EXCEPTIONAL
 * NEW -> CANCELLED
 * NEW -> INTERRUPTING -> INTERRUPTED
 */
//volatile保持可见性
private volatile int state;
//初态:初始状态
private static final int NEW          = 0;
//中态:给计算结果赋值之前,会先CAS设置一下状态
private static final int COMPLETING   = 1;
//终态:结果赋值完成后,设置为该状态
private static final int NORMAL       = 2;
//终态:计算过程中出现异常,设置为该状态
private static final int EXCEPTIONAL  = 3;
//终态:任务被终止
private static final int CANCELLED    = 4;
//中态:计算过程中被中断
private static final int INTERRUPTING = 5;
//终态:将执行线程中断后,设置为该状态
private static final int INTERRUPTED  = 6;

/** 初始化时设置的任务 */
private Callable<V> callable;
/** 返回的结果,可能是计算结果,也可能是异常 */
private Object outcome; // non-volatile, protected by state reads/writes
/** 执行任务的线程 */
private volatile Thread runner;
/** 等待结果的线程,是一个单向链表 */
private volatile WaitNode waiters;

  任务执行的整个阶段会不停的修改state,而get方法会根据state的值做出不同的反应

3. run方法

public void run() {
    if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                    null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                //执行方法
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                //出现异常,这里会先将state设置为COMPLETING,将异常赋值给outcome,再将state设置为EXCEPTIONAL。将所有阻塞的线程唤醒finishCompletion
                setException(ex);
            }
            if (ran)
                //计算成功的话,设置结果
                //先将state设置为COMPLETING,将结果赋值给outcome,再将state设置为EXCEPTIONAL。同样也会唤醒所有阻塞的线程
                set(result);
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

  根据任务执行的结果,设置不同的状态,正常结束和异常退出,都会唤醒等待的线程。

4. get方法

public V get() throws InterruptedException, ExecutionException {
    int s = state;
    if (s <= COMPLETING)
        //根据状态判断,如果还没有到终态,尝试等待
        s = awaitDone(false, 0L);
    //已经有结果,返回结果
    return report(s);
}

  先看下如何处理结果

private V report(int s) throws ExecutionException {
    Object x = outcome;
    if (s == NORMAL)
        //如果是正常计算结果,返回
        return (V)x;
    if (s >= CANCELLED)
        //如果被终止或中断了,抛出被终止异常
        throw new CancellationException();
    //执行过程遇到异常,抛出原异常
    throw new ExecutionException((Throwable)x);
}

  根据不同的状态,返回结果或者抛出异常。再看下等待结果的处理:

private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
    //如果设置超时,计算超时时间
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    boolean queued = false;
    for (;;) {
        if (Thread.interrupted()) {
            //如果等待的线程被中断了,从等待链表中移除这个节点
            removeWaiter(q);
            throw new InterruptedException();
        }

        int s = state;
        if (s > COMPLETING) {
            //任务已结束
            if (q != null)
                //解除关联,方便回收
                q.thread = null;
            //返回执行状态
            return s;
        }
        else if (s == COMPLETING)
            //计算结果正在赋值,让出时间片,稍微等一下
            Thread.yield();
        else if (q == null)
            //新的线程调用get,初始化一个等待节点
            q = new WaitNode();
        else if (!queued)
            //将当前节点设置到等待链表
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                    q.next = waiters, q);
        else if (timed) {
            //设置了超时时间,判断是否超时
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                //已经超时,移除当前节点
                removeWaiter(q);
                //返回任务状态
                return state;
            }
            //未超时,在等待时间内阻塞
            LockSupport.parkNanos(this, nanos);
        }
        else
            //阻塞等待唤醒
            LockSupport.park(this);
    }
}

  调用get的线程,都会在等待链表中阻塞,直到执行完毕,执行线程会唤醒等待线程,当然等待中可以响应中断,或者设置超时。

  

以上是关于FutureTask机制的主要内容,如果未能解决你的问题,请参考以下文章

javajava 多线程 异步计算 FutureTask 源码详解

FutureTask疑问

用简易例子讲Java中的回调机制

你知道的Go切片扩容机制可能是错的

JDK源码分析-FutureTask

八阻塞等待异步结果FutureTask