彻底搞明白JDK的Future机制

Posted hanruikai

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了彻底搞明白JDK的Future机制相关的知识,希望对你有一定的参考价值。

什么是Future接口

Future是java.util.concurrent.Future,是Java提供的接口,可以用来做异步执行的状态获取,它避免了异步任务在调用者那里阻塞等待,而是让调用者可以迅速得到一个Future对象,

后续可以通过Future的方法来获取执行结果。一个实例代码如下:

public class Test 
    public static void main(String[] args) throws ExecutionException, InterruptedException 
        //创建线程池
        ExecutorService executor = Executors.newCachedThreadPool();
        Future future = executor.submit(new Task());
        //这一步get会阻塞当前线程
        System.out.println(future.get());
 
        executor.shutdown();
    
 
    private static class Task implements Callable<Integer> 
 
        @Override
        public Integer call() throws Exception 
            System.out.println("子线程在进行计算");
            Thread.sleep(2000);
            return 1;
        
 
    
 

代码很简单,就是将一个Runnable、Callable的实例放到一个线程池里,就会返回一个Future对象。后续通过future.get()取得执行结果,但事实上代码并没有达到异步回调的结果,而是get时阻塞了

Future原理

因为阅读源码东西太对,这里只是总结关键点,说太多也记不住,先看ExecutorService的submit接口定义,代码如下:

     * @param task the task to submit
     * @param <T> the type of the task's result
     * @return a Future representing pending completion of the task
     * @throws RejectedExecutionException if the task cannot be
     *         scheduled for execution
     * @throws NullPointerException if the task is null
     */
    <T> Future<T> submit(Callable<T> task);

简单分析:

  • 入参是callable的实例,这个没用疑问
  • 返回参数是Future对象

看代码实现类AbstractExecutorService

    public Future<?> submit(Runnable task) 
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    
 public FutureTask(Runnable runnable, V result) 
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    

新建了一个FutureTask对象,状态state是NEW。可能的状态转换是:

Possible state transitions:
* NEW -> COMPLETING -> NORMAL
* NEW -> COMPLETING -> EXCEPTIONAL
* NEW -> CANCELLED
* NEW -> INTERRUPTING -> INTERRUPTED

继续,之后执行的就是FutureTask的run方法,代码如下:

public void run() 
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try 
            Callable<V> c = callable;
            if (c != null && state == NEW) 
                V result;
                boolean ran;
                try 
                    result = c.call();
                    ran = true;
                 catch (Throwable ex) 
                    result = null;
                    ran = false;
                    setException(ex);
                
                if (ran)
                    set(result);
            
         finally 
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        
    

我们看上面的代码,分析一下:

先判断state状态,如果不是NEW说明执行完毕,直接return掉。
后面使用CAS操作,判断这个任务是否已经执行,这里FutureTask有个全局的volatile runner字段,这里通过cas将当前线程指定给runner。
这里可以防止callable被执行多次。

继续往下跟,查看finishCompletion方法:
FutureTask中有一个WaiteNode单链表,当执行futureTask.get()方法时,多个线程会将等待的线程的next指向下一个想要get获取结果的线程。
finishCompletion主要就是使用Unsafe.unpark()进行唤醒操作,代码如下:


    /**
     * Removes and signals all waiting threads, invokes done(), and
     * nulls out callable.
     */
    private void finishCompletion() 
        // assert state > COMPLETING;
        for (WaitNode q; (q = waiters) != null;) 
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) 
                for (;;) 
                    Thread t = q.thread;
                    if (t != null) 
                        q.thread = null;
                        LockSupport.unpark(t);
                    
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null; // unlink to help gc
                    q = next;
                
                break;
            
        

        done();

        callable = null;        // to reduce footprint
    

总结一下:

  1. 并发原子操作仍旧是利用的CAS原子比较,主要是unsafe类
  2. 线程的阻塞、等待、唤醒仍旧是利用类似阻塞队列的链表,里面维护一个链表结构,看链表节点定义:
  /**
     * Simple linked list nodes to record waiting threads in a Treiber
     * stack.  See other classes such as Phaser and SynchronousQueue
     * for more detailed explanation.
     */
    static final class WaitNode 
        volatile Thread thread;
        volatile WaitNode next;
        WaitNode()  thread = Thread.currentThread(); 
    

FutureTask的get方法是阻塞的,利用自旋实现,也是最常用的方式,代码如下:

 private int awaitDone(boolean timed, long nanos)
        throws InterruptedException 
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        for (;;) 
            if (Thread.interrupted()) 
                removeWaiter(q);
                throw new InterruptedException();
            

            int s = state;
            if (s > COMPLETING) 
                if (q != null)
                    q.thread = null;
                return s;
            
            else if (s == COMPLETING) // cannot time out yet
                Thread.yield();
            else if (q == null)
                q = new WaitNode();
            else if (!queued)
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
            else if (timed) 
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) 
                    removeWaiter(q);
                    return state;
                
                LockSupport.parkNanos(this, nanos);
            
            else
                LockSupport.park(this);
        
    

 

记住一点:JDK底层很多实现都是基于下面几个技术:

  1. JDK底层如何控制并发,保证原子性------------CAS操作
  2. JDK并发如何阻塞、唤醒线程--------------------单向链表或者双向链表队列,队列节点waitnode就是线程的id、状态、next节点等
  3. JDK如何实现自旋操作,比如FutureTask的get方法----------------没有那么神奇,就是for循环等待
  4. JDK如何共享线程数据-----------voliate
  5. JDK如何隔离线程数据-------------ThreadLocal

Future的不足

Future其实是一种模式,如下图:

future很明显,虽然是异步执行,但是无法准确知道异步任务说明时候执行完毕,如果调用get方法,在异步没有执行完成时,还是阻塞;如果频繁get检测,效率不高。

所以,我理解,使用future的get操作应该在最后一步,其他操作都已经完成了,一个可以参考的例子:

public int getTotal(final List<Integer> a, final List<Integer> b) throws ExecutionException, InterruptedException 
    Future<Integer> future = Executors.newCachedThreadPool().submit(new Callable<Integer>() 
        @Override
        public Integer call() throws Exception 
            int r = 0;
            for (int num : a) 
                r += num;
            
            return r;
        
    );

    int r = 0;
    for (int num : b) 
        r += num;
    
    return r + future.get();

 

 

 

 

以上是关于彻底搞明白JDK的Future机制的主要内容,如果未能解决你的问题,请参考以下文章

全网首发:JDK绘制文字:一绘制流程

全网首发:JDK绘制文字:一绘制流程

彻底搞明白Spring中的自动装配和Autowired

一文彻底搞懂ReentrantLock原理基于AQS的公平锁+非公平锁

一文彻底搞懂ReentrantLock原理基于AQS的公平锁+非公平锁

让你彻底搞明白项目中如何接入读写分离