线程池源码分析_01 FutureTask源码分析

Posted 兴趣使然の草帽路飞

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了线程池源码分析_01 FutureTask源码分析相关的知识,希望对你有一定的参考价值。


1、FutureTask简介

Future是我们在使用java实现异步时最常用到的一个类,我们可以向线程池提交一个Callable,并通过future对象获取执行结果。

  • FutureTask的使用场景:FutureTask可用于异步获取执行结果或取消执行任务的场景。通过传入Runnable或者Callable的任务给FutureTask,直接调用其run方法或者放入线程池执行,之后可以在外部通过FutureTask的get方法异步获取执行结果,因此,FutureTask非常适合用于耗时的计算,主线程可以在完成自己的任务后,再去获取结果。另外,FutureTask还可以确保即使调用了多次run方法,它都只会执行一次Runnable或者Callable任务,或者通过cancel取消FutureTask的执行等。
  • 关于FutureTask的基本使用可以参考文章:FutureTask的用法

2、源码分析

成员属性

// 表示当前task的状态
private volatile int state;
// 表示当前任务尚未执行
private static final int NEW          = 0;
// 表示当前任务正在结束,尚未完全结束,一种临界状态
private static final int COMPLETING   = 1;
// 表示当前任务正常结束
private static final int NORMAL       = 2;
// 表示当前任务执行过程中发生了异常。 内部封装的 callable.run() 向上抛出异常了
private static final int EXCEPTIONAL  = 3;
// 表示当前任务被取消
private static final int CANCELLED    = 4;
// 表示当前任务中断中..
private static final int INTERRUPTING = 5;
// 表示当前任务已中断
private static final int INTERRUPTED  = 6;

//submit(runnable/callable):runnable 使用装饰者设计模式伪装成 Callable了。
private Callable<V> callable;

// 正常情况下:任务正常执行结束,outcome保存执行结果,作为callable返回值。
// 非正常情况:callable向上抛出异常,outcome保存异常信息
private Object outcome;

// 表示当前任务被线程执行期间,保存当前执行任务的线程对象引用。
private volatile Thread runner;

// 因为会有很多线程去get当前任务的结果,所以这里使用了一种数据结构 stack **头插、头取**的一个队列。
private volatile WaitNode waiters;

static final class WaitNode {
    volatile Thread thread;
    volatile WaitNode next;
    WaitNode() { thread = Thread.currentThread(); }
}

构造方法

public FutureTask(Callable<V> callable) {
    if (callable == null)
        throw new NullPointerException();
    // callable就是程序员自己实现的业务类
    this.callable = callable;
    // 设置当前任务状态为NEW: 表示当前任务尚未执行
    this.state = NEW;
}

public FutureTask(Runnable runnable, V result) {
    // 使用装饰者模式将runnable转换为了callable接口,外部线程通过get获取
    // 当前任务执行结果时,结果可能为 null 也可能为传进来的值。
    this.callable = Executors.callable(runnable, result);
    // 设置当前任务状态为NEW: 表示当前任务尚未执行
    this.state = NEW;
}

成员方法

1. run方法

// submit(runnable/callable) -> newTaskFor(runnable) -> execute(task) -> pool
// 线程执行入口的方法
public void run() {
    // 条件一:state != NEW 条件成立,说明当前task已经被执行过了或者被cancel了,
    // 总之非NEW状态的任务,线程就不处理了,直接return;结束!
    // 条件二:!UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread())
    // 条件成立:cas失败,当前任务被其它线程抢占了...
    if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                    null, Thread.currentThread()))
        return;

    // 如果执行到这里,说明当前task一定是 NEW 状态,而且当前线程也抢占TASK成功!
    try {
        // callable 就是程序员自己封装逻辑的callable 或者装饰后的runnable
        Callable<V> c = callable;
        // 条件一:c != null 防止空指针异常
        // 条件二:state == NEW 防止外部线程 cancel掉当前任务。
        if (c != null && state == NEW) {
            // 结果的引用
            V result;
            // true 表示callable.run 代码块执行成功 未抛出异常
            // false 表示callable.run 代码块执行失败 抛出异常
            boolean ran;
            try {
                // 调用程序员自己实现的callable 或者 装饰后的runnable
                result = c.call();
                // c.call未抛出任何异常,ran会设置为true 表示任务执行成功
                ran = true;
            } catch (Throwable ex) {
                // 说明程序员自己写的逻辑块有bug了。
                result = null;
                // ran设置为false
                ran = false;
                // 将当前异常信息ex赋值给outcome
                setException(ex);
            }
            
            // run: 
            // true -> 代码块执行成功 未抛出异常
            // false -> 代码块执行失败 抛出异常
            if (ran)
                // 说明当前c.call正常执行结束了。
                // set方法: 以CAS的方式将result结果设置给outcome
                set(result);
        }
    } finally {
        // 将当前执行任务的线程置为null
        runner = null;
        // 当前任务的状态
        int s = state;
        // s >= INTERRUPTING:如果条件成立,说明当前任务处于中断中或者已中断状态...
        if (s >= INTERRUPTING)
            // 后面分析cancel()方法的时候再回头看这里就明白了!
            handlePossibleCancellationInterrupt(s);
    }
}

/**
 * 以CAS的方式设置结果v给outcome
 */
protected void set(V v) {
    // 使用CAS方式设置当前任务状态为完成中...
    // 有没有可能失败呢? 外部线程等不及了,直接在set执行CAS之前将task取消了。(很小概率事件)
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        // 将v赋值给outcome
        outcome = v;
        // 将结果v赋值给outcome之后,马上会将当前任务状态修改为NORMAL(正常结束状态)。
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state

        // 当线程被挂起之后,如果任务线程执行完毕,就会唤醒等待线程。
        // 这一步就是在finishCompletion里面做的, 后面分析get() 的时候再说~
        // 移除并唤醒所有等待线程,执行done,置空callable
        finishCompletion();
    }
}

/**
 * 将当前异常信息t赋值给outcome
 */
protected void setException(Throwable t) {
    // 使用CAS方式设置当前任务状态为 完成中..
    // 有没有可能失败呢? 外部线程等不及了,直接在set执行CAS之前 将 task取消了。  很小概率事件。
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        // 引用的是 callable 向上层抛出来的异常。
        outcome = t;
        // 将当前任务的状态 修改为 EXCEPTIONAL
        UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
        
        // 当线程被挂起之后,如果任务线程执行完毕,就会唤醒等待线程。
        // 这一步就是在finishCompletion里面做的, 后面分析get() 的时候再说~
        // 移除并唤醒所有等待线程,执行done,置空callable
        finishCompletion();
    }
}

2. get方法

// get获取当前任务执行结束后得到的结果:
// 注意:get不一定是只有一个线程去得到任务执行后的结果,多个线程等待获取当前任务执行完成后的结果这种场景也是有的!
public V get() throws InterruptedException, ExecutionException {
    // 获取当前任务状态
    int s = state;
    // s <= COMPLETING:
    // 条件成立:(当前任务可能正处于:未执行0、正在执行1、正在完成1 状态,总之是还没有结束)。 
    // 那么,此时调用get的外部线程会被阻塞在get方法上。
    if (s <= COMPLETING)
        // awaitDone执行完后会返回task当前状态,如果该方法执行期间,task被中断了,则会直接抛出中断异常:
        // awaitDone是futureTask实现阻塞的关键方法: 等待任务执行完毕,如果任务取消或者超时则停止!
        s = awaitDone(false, 0L);
    // report(s)去获取最终task执行结束得到的结果
    return report(s);
}

// awaitDone方法: awaitDone是futureTask实现阻塞的关键方法
/**
 1. 等待任务执行完毕,如果任务取消或者超时则停止
 2. @param timed 为true表示设置超时时间
 3. @param nanos 超时时间
 4. @return 任务完成时的状态
 5. @throws InterruptedException 中断异常
 */
private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
    // 0 不带超时
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    // 引用当前线程封装成 WaitNode对象(**头插、头取**的一个队列。)
    WaitNode q = null;
    // 表示当前线程 waitNode对象是否入队/压栈
    boolean queued = false;
    // 自旋
    for (;;) {
        // 条件成立:说明当前线程唤醒是被其它线程使用中断这种方式喊醒的。interrupted()
        // 返回true 后会将 Thread的中断标记重置回false.
        if (Thread.interrupted()) {
            // 当前线程节点出队
            removeWaiter(q);
            // 上抛,使get方法抛出中断异常。
            throw new InterruptedException();
        }

        // 假设当前线程是被其它线程使用unpark(thread) 唤醒的话,会正常自旋,走下面逻辑:

        // 获取当前任务最新状态
        int s = state;
        // 条件成立:说明当前任务已经有结果了.. (可能是完成、异常、中断等等)
        if (s > COMPLETING) {
            // 条件成立:说明已经为当前线程创建过WaitNode了,此时需要将 node.thread = null helpGC
            if (q != null)
                q.thread = null;
            // 直接返回当前状态.
            return s;
        }
        // 条件成立:说明当前任务接近完成状态、或者接近失败状态...
        // 这里让当前线程再释放cpu ,进行下一次抢占cpu:
        else if (s == COMPLETING) // cannot time out yet
            Thread.yield();
        // 条件成立:第一次自旋,当前线程还未创建 WaitNode 对象,此时为当前线程创建 WaitNode对象
        else if (q == null)
            q = new WaitNode();
        // 条件成立:第二次自旋,当前线程已经创建 WaitNode对象了,但是node对象还未入队
        else if (!queued){
            // 当前线程node节点 next 指向原队列的头节点 waiters 一直指向队列的头!
            q.next = waiters;
            // cas方式设置waiters引用指向当前线程node, 成功的话 queued == true 否则,可能其它线程先你一步入队了。
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset, waiters, q);
        }
        // 第三次自旋,会到这里:表示是否设置了超时时间
        else if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                // 已经超时的话,移除等待节点
                removeWaiter(q);
                return state;
            }
             // 未超时,将当前线程挂起指定时间
            LockSupport.parkNanos(this, nanos);
        }
        else
            // 当前get操作的线程就会被park了。线程状态会变为 WAITING状态,相当于休眠了..
            // 除非有其它线程将你唤醒 或者 将当前线程中断。
            // 如果当前线程被其他线程唤醒,醒来时,还是从这里向下继续执行(继续进入自旋for进行条件判断)
            // (在finishCompletion中会唤醒这个挂起的线程!)
            LockSupport.park(this);
    }
}

// report(s)去获取最终task执行结束得到的结果
@SuppressWarnings("unchecked")
private V report(int s) throws ExecutionException {
    // 正常情况下,outcome 保存的是callable运行结束的结果
    // 非正常情况下,保存的是 callable 抛出的异常。
    Object x = outcome;
    // 条件成立(正常情况):当前任务状态正常结束
    if (s == NORMAL)
        // 直接返回callable运算结果
        return (V)x;

    // 条件成立(非正常情况):当前任务是被取消或中断状态
    if (s >= CANCELLED)
        // 抛异常!
        throw new CancellationException();

    // 执行到这,说明callable接口实现中,是有bug的...
    throw new ExecutionException((Throwable)x);
}

// node出队方法:
private void removeWaiter(WaitNode node) {
    if (node != null) {
        node.thread = null;
        retry:
        for (;;) {          // restart on removeWaiter race
            for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
                s = q.next;
                if (q.thread != null)
                    pred = q;
                else if (pred != null) {
                    pred.next = s;
                    if (pred.thread == null) // check for race
                        continue retry;
                }
                else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
                        q, s))
                    continue retry;
            }
            break;
        }
    }
}

// 当线程被挂起之后,如果任务线程执行完毕,就会唤醒等待线程。这一步就是在finishCompletion里面做的
/**
 * 移除并唤醒所有等待线程,执行done,置空callable
 */
private void finishCompletion() {
    // 遍历等待节点:
    // q指向waiters链表的头结点
    for (WaitNode q; (q = waiters) != null;) {
        // 使用cas设置 waiters为null 
        // 为了防止外部线程使用cancel取消当前任务,也会触发finishCompletion方法。(小概率事件)
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
            for (;;) {
                // 获取当前node节点封装的 thread
                Thread t = q.thread;
                // 条件成立:说明当前线程不为null
                if (t != null) {
                    q.thread = null;// help GC
                    // 唤醒当前节点对应的线程(在awaitDone方法最后一个else判断中park,在此处唤醒)
                    LockSupport.unpark(t);
                }
                // next: 当前节点的下一个节点
                WaitNode next = q.next;
                // next == null 说明是最后一个节点,则直接break即可
                if (next == null)
                    break;
                q.next = null; // unlink to help gc
                q = next;
            }
            break;
        }
    }
    // 模板方法,可以被覆盖
    done();
    // 将callable 设置为null helpGC
    callable = null;
}

3. cancel方法

// 将当前线程的任务取消(中断)掉:
public boolean cancel(boolean mayInterruptIfRunning) {
    // 条件一:state == NEW 成立,表示当前任务处于运行中或者处于线程池任务队列中..
    // 条件二:UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))
    // 条件成立:说明CAS修改状态成功,可以去执行下面逻辑了,否则返回false,表示cancel失败。
    if (!(state == NEW &&
            UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                    mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
        return false;

    try {
        if (mayInterruptIfRunning) {
            try {
                // 执行当前FutureTask 的线程,有可能现在是null,是null 的情况是: 当前任务在 队列中,还没有线程获取到它呢。。
                Thread t = runner;
                // 条件成立:说明当前线程 runner ,正在执行task.
                if (t != null)
                    // 给runner线程一个中断信号.. 如果你的程序是响应中断 会走中断逻辑..假设你程序不是响应中断的..啥也不会发生。
                    t.interrupt();
            } finally { // final state
                // 设置任务状态为 中断完成。
                UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
            }
        }
    } finally {
        // 唤醒所有get()阻塞的线程。
        finishCompletion();
    }
    return true;
}

  • 如果文章有帮助,点赞支持一下哈~

以上是关于线程池源码分析_01 FutureTask源码分析的主要内容,如果未能解决你的问题,请参考以下文章

线程池源码分析-FutureTask

线程池源码分析-ThreadPoolExecutor

Java异步编程——深入源码分析FutureTask

线程之CallableFuture 和FutureTask使用及源码分析

Java Review - 线程池使用FutureTask的小坑

Java Review - 线程池使用FutureTask的小坑