《线程池系列七》-Guava ListenableFuture AbstractFuture实现原理讲解

Posted PIGP

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了《线程池系列七》-Guava ListenableFuture AbstractFuture实现原理讲解相关的知识,希望对你有一定的参考价值。

该篇文章与线程池的关系不是很大,由于它与FutureTask的实现非常的相似,因此放在了线程池系列。在学习ListenableFuture时一定要对比着FutureTask的相关知识点学习,了解两者的共同点、区别、适应场景。本节将详细讲解ListenableFure的默认实现AbstractFuture类,并结合FutureTask展开一些列的问题讨论。有关FutureTask的知识,请阅读我的另一篇文章《线程池系列一》-FutureTask原理讲解与源码剖析。本节标题与《线程池系列六》-Guava ListenableFutureTask非常相似,但是这两篇文章完全不同,ListenableFutureTask是基于FutureTask的实现,而本文主要讲解的是ListenableFuture的默认实现AbstractFuture类的使用

ListenableFuture

ListenableFuture 接口继承Future 接口。Future接口是JDK定义的接口,其定义了与任务操作相关的方法,例如任务的取消:cancel(),判断任务的状态:isCancelled()、isDone(),获取任务结果:get()等。 ListenableFuture 是Guava(google提供的一个java开发包)定义的接口,其在Future接口的基础上添加了addListener()方法,源码如下:

 
   
   
 
  1. void addListener(Runnable listener, Executor executor);

该方法主要用于给Future任务添加监听任务,需要主要的是:

  • listener任务必须指定执行该任务的线程池executor(该executor一定不要与执行Future任务的线程池是同一个,否则会出现死锁情况)

  • 监听任务执行的时机为Future任务执行完成(包括正确执行完成和任务抛异常终止)或者被取消。


AbstractFuture 内部类Sync

AbstractFuture的实现与FutureTask的实现非常相似,FutureTask使用unsafe包的CAS实现,而AbstractFuture使用的是AQS(AbstractQueuedSynchronizer,jdk锁实现的模板类,本文不做讲解),AbstractFuture将99%的操作全部都交于内部类Sync实现,下面讲解一下Sync类的实现

  • 任务的状态信息

Sync将任务状态分为5种,分别为running、completing、completed、cancelled、interrupted。其与FutureTask任务的状态对比如下图所示:

从图中可以看出,AbstractFure相比与FutureTask少了两种状态Exceptional和interrupting状态,其中Exceptional状态,在AbstractFure中通过添加一个Throwable类型的结果来实现(如果Throwable对象的值不为null,则说明是exceptional),而interrupting状态再FutureTask中的作用就不是很大,在AstractFurue中并没有设计该状态。

  • 成员变量

与FutureTask不同,FutureTask中只有一个Object对象用来存放Future执行的结果,可以是正常结果,也可以是异常。在AbstractFuture中,将两种结果分开,正常结果放在value中,异常结果放在exception中,源码如下:

 
   
   
 
  1. private V value;

  2. private Throwable exception;

这也是为什么不设置exceptional状态就能区分正常和异常结果的原因。

  • 锁方法的重写

该锁使用的是共享锁来实现,主要涉及两个方法:

  1. tryAcquireShared(int ignored) 该方法是在获取锁时调用,多个线程可以同时获取锁

  2. tryReleaseShared(int finalState) 该方法是在释放锁时调用

在锁的时候时,我们一般都是先尝试获取锁,然后处理临界资源,处理完成后释放锁。而在AbstractFuture中并不是这种常规的使用方式,其实现是必须有一个线程先调用releaseShared(int arg)释放锁,其他线程才能调用acquireShared(int arg)获取锁,否则,所有的获取锁线程都将会堵塞源码如下:

 
   
   
 
  1. protected int tryAcquireShared(int ignored) {

  2.    if (isDone()) {

  3.        return 1;

  4.    }

  5.    return -1;

  6. }

  7. @Override

  8. protected boolean tryReleaseShared(int finalState) {

  9.    setState(finalState);

  10.    return true;

  11. }

从源码中可以看出,调用tryAcquireShared()方法就是判断任务有没有完成(isDone()方法),任务完成成功获取锁,任务没有完成则等待。 那么问题就是任务什么时候完成,任务完成都会调用tryReleaseShared()方法,该方法用于任务完成或者取消时设置最终状态。

  • 成员方法

一. 阻塞与非阻塞的get()方法 get()方法或间接调用获取锁操作,如果成功获取锁,则返回对应的结果,如果获取不到锁,则返回对应的异常

 
   
   
 
  1. V get(long nanos) throws TimeoutException, CancellationException,

  2.        ExecutionException, InterruptedException {

  3.    //间接调用tryAcquireShared(arg)方法,支持中断,最长等待nanos时间

  4.    if (!tryAcquireSharedNanos(-1, nanos)) {

  5.        throw new TimeoutException("Timeout waiting for task.");

  6.    }

  7.    return getValue();

  8. }

  9. V get() throws CancellationException, ExecutionException,

  10.        InterruptedException {

  11.    //间接调用tryAcquireShared(arg)方法,支持中断,直到获取锁返回

  12.    acquireSharedInterruptibly(-1);

  13.    return getValue();

  14. }

从源码中可以看出,两种get()方法都会间接的调用tryAcquireShared(arg)方法,且都支持中断。在成功获取锁之后,两者都会调用getValue()方法,其源码如下:

 
   
   
 
  1. private V getValue() throws CancellationException, ExecutionException {

  2.    int state = getState();

  3.    switch (state) {

  4.        case COMPLETED:

  5.            if (exception != null) {

  6.                throw new ExecutionException(exception);

  7.            } else {

  8.                return value;

  9.            }

  10.        case CANCELLED:

  11.        case INTERRUPTED:

  12.            throw cancellationExceptionWithCause(

  13.                    "Task was cancelled.", exception);

  14.        default:

  15.            throw new IllegalStateException(

  16.                    "Error, synchronizer in invalid state: " + state);

  17.    }

  18. }

该方法主要针对三种终止状态做处理:

  1. completed状态,该状态又分为正常结束和异常终止,通过判断exception是否为null进行区分,如果异常,则抛出异常,否则返回正常结束的结果

  2. cancelled和interrupted状态,统一处理抛出取消异常

  3. 其他情况,讲道理不会出现的状态,如果出现了抛出非法状态异常

二. 设值方法 该类方法都会间接调用 tryReleaseShared(int finalState)方法,使锁处于可获取状态,表示任务执行完成。其中包括set(@Nullable V v)设值正常值、setException(Throwable t)设值异常、cancel(boolean interrupt)取消任务,期源码如下:

 
   
   
 
  1. boolean set(@Nullable V v) {

  2.    return complete(v, null, COMPLETED);

  3. }

  4. boolean setException(Throwable t) {

  5.    return complete(null, t, COMPLETED);

  6. }

  7. boolean cancel(boolean interrupt) {

  8.    return complete(null, null, interrupt ? INTERRUPTED : CANCELLED);

  9. }

从源码中可以看出三者都调用了complete(@Nullable V v, @Nullable Throwable t, int finalState)方法,下面对该方法进行详细的讲解,其核心逻辑如下:

  1. 将任务状态由running修改为completing状态

  2. 如果状态修改成功,则对结果value和异常exception赋值,异常的赋值主要看方法参数finalState,改状态如果为cancelled或者interrupted则为异常赋值,否则赋值为参数t的值(t可能为null),然后调用releaseShared(finalState)(该方法间接调用tryReleaseShared(arg))方法更新任务状态为最终状态。

  3. 如果状态修改失败,判断状态是否为completing状态,如果是则说明任务已经执行完成,只在赋值阶段,执行acquireShared(-1)获取锁操作,使自己阻塞至任务完成

源码如下:

 
   
   
 
  1. private boolean complete(@Nullable V v, @Nullable Throwable t,

  2.                         int finalState) {

  3.    boolean doCompletion = compareAndSetState(RUNNING, COMPLETING);

  4.    if (doCompletion) {

  5.        // If this thread successfully transitioned to COMPLETING, set the value

  6.        // and exception and then release to the final state.

  7.        this.value = v;

  8.        // Don't actually construct a CancellationException until necessary.

  9.        this.exception = ((finalState & (CANCELLED | INTERRUPTED)) != 0)

  10.                ? new CancellationException("Future.cancel() was called.") : t;

  11.        releaseShared(finalState);

  12.    } else if (getState() == COMPLETING) {

  13.        // If some other thread is currently completing the future, block until

  14.        // they are done so we can guarantee completion.

  15.        acquireShared(-1);

  16.    }

  17.    return doCompletion;

  18. }

AbstractFuture的状态转换全部都在该方法中了。 三. 状态判断方法 主要判断任务状态state的值,state的值存放在AQS中,再次不过多讲解,源码如下:

 
   
   
 
  1. boolean isDone() {

  2.    return (getState() & (COMPLETED | CANCELLED | INTERRUPTED)) != 0;

  3. }

  4. boolean isCancelled() {

  5.    return (getState() & (CANCELLED | INTERRUPTED)) != 0;

  6. }

  7. boolean wasInterrupted() {

  8.    return getState() == INTERRUPTED;

  9. }


AbstractFuture

  • 成员变量

 
   
   
 
  1.  private final Sync<V> sync = new Sync<V>();

  2.  private final ExecutionList executionList = new ExecutionList();

其中,sync已经讲解,ExecutionList主要用于执行listener,其实用可以参考我的另一篇博客《线程池系列六》-Guava ListenableFutureTask中有关于ExecutionList的详细讲解

  • 成员方法

大部分的成员方法都是直接调用sync的对应方法,并没有做过多的操作,只是简单的将方法暴露给外部使用而已。其中,get(long timeout, TimeUnit unit)、get()、isDone()、isCancelled()、wasInterrupted()都是直接调用的sync的方法。 除了上述方法外,还有赋值操作和取消操作的方法,由于该类方法设计到任务完成回调listener方法的关系,因此不是简单的调用sync的方法,其实现如下所示:

 
   
   
 
  1.  protected boolean set(@Nullable V value) {

  2.    boolean result = sync.set(value);

  3.    if (result) {

  4.      executionList.execute();

  5.    }

  6.    return result;

  7.  }

如果设置成功,表示任务结束,则执行listener方法(executionList.execute()) setException(Throwable throwable)方法与set(@Nullable V value)操作一直,多了一个非空判断而已,不再讲解。 cancel() 如果取消成功,则执行listener回调,如果参数为真,则interruptTask();该方法为抽象方法,子类可以实现。cancel()的源码如下:

 
   
   
 
  1.  public boolean cancel(boolean mayInterruptIfRunning) {

  2.    if (!sync.cancel(mayInterruptIfRunning)) {

  3.      return false;

  4.    }

  5.    executionList.execute();

  6.    if (mayInterruptIfRunning) {

  7.      interruptTask();

  8.    }

  9.    return true;

  10.  }


AbstractFuture与FutureTask的区别

  1. AbstractFuture通过AQS实现,FutureTask通过unsafe CAS实现,本质是一样的

  2. AbstractFuture 有五种状态,两种任务结果value和exception,而FutureTask有七种状态,任务执行结果只有一个outcome

  3. AbstractFuture没有实现Runnable接口,不能作为任务放到线程池中执行,而FutureTask可以

  4. AbstractFuture有接口回调,FutureTask没有,但是留下了回调的接口,可以重写done()方法

AbstractFuture的使用场景

AbstractFuture是一个抽象类,我们需要自定义子类来使用AbstractFuture,又因此AbstractFuture并没有实现Runnable接口,因此其不适合和线程池配合使用(子类同时实现Runnable接口也是可以的)。它经常用来与AsyncHttpClient配合使用,使用异步HttpClient发起请求,在请求的回调中根据请求的返回结果执行AbstractFuture对象set()、setException()、cancel()操作。由于本节篇幅过长,下篇文章再举例其用法。

欢迎扫描下方二维码,关注公众号,我们可以进行技术交流,共同成长


以上是关于《线程池系列七》-Guava ListenableFuture AbstractFuture实现原理讲解的主要内容,如果未能解决你的问题,请参考以下文章

从Java Future到Guava ListenableFuture实现异步调用

多线程(十七深入了解线程池-ThreadPoolExecutor)

JAVA多线程学习七-线程池

线程池技术

大数据批量新增or修改太慢太Low,线程池CountDownLatchCompletableFuture完美解决

(十七)AtomicInteger原子类的介绍和使用