Posted rgrey996
我们知道,Future.get() 可以获取异步执行的结果,那么它是怎么做到的呢?
要实现线程的数据交换,我们按照进程间的通信方式可知有: 管道、共享内存、Socket套接字。而同一个jvm的两个线程通信,所有线程共享内存区域,则一定是通过共享内存再简单不过了。
本文将以 ThreadPoolExecutor 线程池 来解释这个过程。
首先,如果想要获取一个线程的执行结果,需要调用 ThreadPoolExecutor.submit(Callable); 方法。然后该方法会返回一个 Future 对象,通过 Future.get(); 即可获取结果了。
一、首先,我们来看一下 submit 过程
仅为返回了一个 Future<?> 的对象供下游调用!
// AbstractExecutorService public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); // 包装一层结果,RunnableFuture, 也实现了 Runnable 接口 // 实际上就是 FutureTask RunnableFuture<T> ftask = newTaskFor(task); // 然后交由 线程池进行调用任务了,即由 jvm 调用执行 Thread // 具体执行逻辑,在我之前的文章中也已经阐述,自行搜索 execute(ftask); // 最后,把包装对象返回即可 return ftask; } /** * Returns a {@code RunnableFuture} for the given callable task. * * @param callable the callable task being wrapped * @param <T> the type of the callable‘s result * @return a {@code RunnableFuture} which, when run, will call the * underlying callable and which, as a {@code Future}, will yield * the callable‘s result as its result and provide for * cancellation of the underlying task * @since 1.6 */ protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return new FutureTask<T>(callable); } // FutureTask 实例化 /** * Creates a {@code FutureTask} that will, upon running, execute the * given {@code Callable}. * * @param callable the callable task * @throws NullPointerException if the callable is null */ public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; // ensure visibility of callable }
通过上面的分析,我们可以看到,异步线程的执行被包装成了 FutureTask, 而java的异步线程执行都是由jvm调用Thread.run()进行, 所以异步起点也应该从这里去找:
// FutureTask.run() public void run() { // 不允许多次执行 if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { // 直接调用 call() 方法,获取返回结果 result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; // 执行异常,包装异常信息 setException(ex); } // 将结果设置到当前的 FutureTask 实例变量 outcome 中,这样当前线程就可以获取了 // 设置结果时,会将 state 同时变更 if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } } /** * Sets the result of this future to the given value unless * this future has already been set or has been cancelled. * * <p>This method is invoked internally by the {@link #run} method * upon successful completion of the computation. * * @param v the value */ protected void set(V v) { // 设置结果时,还不代表可以直接获取了,还有后续工作,所以设置为 COMPLETING 中间态 if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state // 通知线程执行完成等后续工作 finishCompletion(); } } /** * Removes and signals all waiting threads, invokes done(), and * nulls out callable. */ private void finishCompletion() { // assert state > COMPLETING; // 外部看起来是一个 for, 实际上只会执行一次, 目的是为了保证内部的锁获取成功 // 如果有其他线程成功后, waiters也就会为null, 从而自身也一起退出了 for (WaitNode q; (q = waiters) != null;) { // 保证更新的线程安全性 // 只要锁获取成功,就会一次性更新完成,不会失败 if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (;;) { Thread t = q.thread; // 依次唤醒等待的线程 if (t != null) { q.thread = null; LockSupport.unpark(t); } WaitNode next = q.next; // 只有把所有 wait 线程都通知完后,才可以退出 if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } // 完成后钩子方法,默认为空,如果需要做特殊操作可以自行复写即可 done(); callable = null; // to reduce footprint } // 简单看一下异常信息的包装,与 正常结束方法类似,只是将 outcome 设置为了异常信息,完成状态设置为 EXCEPTIONAL /** * Causes this future to report an {@link ExecutionException} * with the given throwable as its cause, unless this future has * already been set or has been cancelled. * * <p>This method is invoked internally by the {@link #run} method * upon failure of the computation. * * @param t the cause of failure */ protected void setException(Throwable t) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = t; UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state finishCompletion(); } }
不管怎么样,你明白一点,所有的执行结果都被放到 FutureTask 的 outcome 变量中了,我们如果想要知道结果,那么,只需要获取这个变量就可以了。
当然是用户调用 future.get() 获取了!