AQS同步组件-FutureTask解析和用例

Posted 共饮一杯无

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了AQS同步组件-FutureTask解析和用例相关的知识,希望对你有一定的参考价值。

FutureTask原理


FutureTask间接实现了runnable接口和future接口,说明了futureTask是runnable与callnable的集合体,即是有返回值的runnable方法。因此,FutureTask可以交给Executor执行,也可以由调用线程直接执行(FutureTask.run())。

源码分析

构造函数


    public FutureTask(Callable<V> callable) 
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    

    public FutureTask(Runnable runnable, V result) 
        //当构造方法传入参数为Runnable,会通过Executors.callable方法将其转换成Callable
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    
FutureTask新建提供两个构造方法来封装Callable和Runnable,当构造方法传入参数为Runnable,会通过Executors.callable方法将其转换成Callable。

常用方法

/**
* 可能的状态转换::
* 新建 -> 已完成 -> 正常
* 新建 -> 已完成 -> 异常
* 新建 -> 已取消
* 新建 -> 中断ing -> 已中断
 */
private volatile int state;
private static final int NEW          = 0;
private static final int COMPLETING   = 1;
private static final int NORMAL       = 2;
private static final int EXCEPTIONAL  = 3;
private static final int CANCELLED    = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED  = 6;

boolean cancel(boolean mayInterruptIfRunning);//取消任务
boolean isCancelled();//是否被取消
boolean isDone();//计算是否完成
//get方法,获取执行结果,如果当前线程还没有执行完成, get方法会被阻塞。
public V get()
//可以设置超时时间并获取执行结果,如果当前线程还没有执行完成, get方法会被阻塞。
public V get(long timeout, TimeUnit unit)

/**
  * awaitDone方法其实是个死循环,直到task状态变为已完成状态或者等待时间超过
  *超时时间或者线程中断才会跳出循环,程序结束;
  *为了节省开销,线程不会一直自旋等待,而是会阻塞,使用LockSupport的park系列方法实现线程阻塞
 */
private int awaitDone(boolean timed, long nanos)
    throws InterruptedException 
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    boolean queued = false;
    for (;;) 
        //如果线程中断,将当前线程从等待队列waiters中移除,抛出中断异常
        if (Thread.interrupted()) 
            removeWaiter(q);
            throw new InterruptedException();
        

        int s = state;
        //如果线程已完成,设为null
        if (s > COMPLETING) 
            if (q != null)
                q.thread = null;
            return s;
        
        //如果正在执行,让出cpu
        else if (s == COMPLETING) // cannot time out yet
            Thread.yield();
        //如果节点为空,则初始化节点
        else if (q == null)
            q = new WaitNode();
        else if (!queued)
            //CAS
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                 q.next = waiters, q);
        //超时将节点移除队列。否则阻塞到超时。
        else if (timed) 
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) 
                removeWaiter(q);
                return state;
            
            LockSupport.parkNanos(this, nanos);
        
        else
            //阻塞自己
            LockSupport.park(this);
    

根据FutureTask的run方法执行的时机,FutureTask可以处于以下三种执行状态:

  1. 未启动:在FutureTask.run()还没执行之前,FutureTask处于未启动状态。当创建一个FutureTask对象,并且run()方法未执行之前,FutureTask处于未启动状态。
  2. 已启动:FutureTask对象的run方法启动并执行的过程中,FutureTask处于已启动状态。
  3. 已完成:FutureTask正常执行结束,或者FutureTask执行被取消(FutureTask对象cancel方法),或者FutureTask对象run方法执行抛出异常而导致中断而结束,FutureTask都处于已完成状态。

  • 当FutureTask处于未启动或已启动状态时,执行FutureTask.get()方法将导致调用线程阻塞
  • 当FutureTask处于已完成状态时,执行FutureTask.get()方法将导致调用线程立即返回结果或抛出异常
  • 当FutureTask处于未启动状态时,执行FutureTask.cancel()方法将导致此任务永远不会被执行
  • 当FutureTask处于已启动状态时,执行FutureTask.cancel(true)方法将以中断执行此任务线程的方式来试图停止任务
  • 当FutureTask处于已启动状态时,执行FutureTask.cancel(false)方法将不会对正在执行此任务的线程产生影响(让正在执行的任务运行完成)
  • 当FutureTask处于已完成状态时,执行FutureTask.cancel(…)方法将返回false。

    使用案例

    FutureTask、Runnable、Callable

    public static void main(String[] args) throws Exception 
        FutureTask<String> futureTask = new FutureTask<String>(new Callable<String>() 
            @Override
            public String call() throws Exception 
                log.info("callable耗时任务开始");
                //耗时任务
                Thread.sleep(5000);
                log.info("callable耗时任务完成");
                return "耗时任务:报告!我已完成";
            
        );
    
        ExecutorService executor = Executors.newSingleThreadExecutor();
        executor.submit(futureTask);
        //executor.execute(futureTask);
        log.info("主线程任务开始");
        Thread.sleep(1000);
        log.info("主线程任务完成");
        log.info("等待耗时任务完成。。。");
        //获取耗时任务的返回结果,如果未返回,主线程将阻塞,处于等待状态
        String result = futureTask.get();
        log.info("result:", result);
    

    输出结果如下:

    12:22:06.250 [main] INFO com.zjq.aqs.FutureTaskExample - 主线程任务开始
    12:22:06.250 [Thread-0] INFO com.zjq.aqs.FutureTaskExample - callable耗时任务开始
    12:22:07.254 [main] INFO com.zjq.aqs.FutureTaskExample - 主线程任务完成
    12:22:07.254 [main] INFO com.zjq.aqs.FutureTaskExample - 等待耗时任务完成。。。
    12:22:11.254 [Thread-0] INFO com.zjq.aqs.FutureTaskExample - callable耗时任务完成
    12:22:11.254 [main] INFO com.zjq.aqs.FutureTaskExample - result:耗时任务:报告!我已完成
  • 可以把FutureTask交给Executor执行
  • 也可以通过ExecutorService.submit(…)方法返回一个FutureTask,然后执行FutureTask.get()方法或FutureTask.cancel(…)方法
  • 除此以外,还可以单独使用FutureTask

    Future、Callable

    public static void main(String[] args) throws Exception 
    
        ExecutorService executor = Executors.newCachedThreadPool();
        Future<String> future = executor.submit(new MyCallable());
        log.info("主线程任务开始");
        Thread.sleep(1000);
        log.info("主线程任务完成");
        log.info("等待耗时任务完成。。。");
        //获取耗时任务的返回结果,如果未返回,主线程将阻塞,处于等待状态
        String result = future.get();
        log.info("result:", result);
    
    
    static class MyCallable implements Callable<String> 
    
        @Override
        public String call() throws Exception 
            log.info("callable耗时任务开始");
            //耗时任务
            Thread.sleep(5000);
            log.info("callable耗时任务完成");
            return "耗时任务:报告!我已完成";
        
    

    当一个线程需要等待另一个线程把某个任务执行完后它才能继续执行,此时可以使用FutureTask.

以上是关于AQS同步组件-FutureTask解析和用例的主要内容,如果未能解决你的问题,请参考以下文章

AQS同步组件-CountDownLatch解析和案例

聊聊高并发(二十四)解析java.util.concurrent各个组件 深入理解AQS

并发编程:AQS

AQS

AQS面试题

程序员谈话系列——————解开AQS的神秘面纱