nettyNetty并发工具-Promise

Posted 九师兄

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了nettyNetty并发工具-Promise相关的知识,希望对你有一定的参考价值。

1.概述

转载并且补充:从源码上理解Netty并发工具-Promise

2.前提

最近一直在看Netty相关的内容,也在编写一个轻量级的RPC框架来练手,途中发现了Netty的源码有很多亮点,某些实现甚至可以用苛刻来形容。另外,Netty提供的工具类也是相当优秀,可以开箱即用。这里分析一下个人比较喜欢的领域,并发方面的一个Netty工具模块 - Promise。


环境版本:

Netty:4.1.44.Final
JDK1.8

3.Promise简介

Promise,中文翻译为承诺或者许诺,含义是人与人之间,一个人对另一个人所说的具有一定憧憬的话,一般是可以实现的。

io.netty.util.concurrent.Promise在注释中只有一句话:特殊的可写的io.netty.util.concurrent.Future(Promise接口是io.netty.util.concurrent.Future的子接口)。而io.netty.util.concurrent.Futurejava.util.concurrent.Future的扩展,表示一个异步操作的结果。我们知道,JDK并发包中的Future是不可写,也没有提供可监听的入口(没有应用观察者模式),而Promise很好地弥补了这两个问题。另一方面从继承关系来看,DefaultPromise是这些接口的最终实现类,所以分析源码的时候需要把重心放在DefaultPromise类。一般一个模块提供的功能都由接口定义,这里分析一下两个接口的功能列表:

io.netty.util.concurrent.Promise
io.netty.util.concurrent.Future

先看io.netty.util.concurrent.Future接口:


package io.netty.util.concurrent;

import java.util.concurrent.CancellationException;
import java.util.concurrent.TimeUnit;


/**
 * The result of an asynchronous operation.
 * 异步操作的结果.
 *
 * TODO: 问题:netty为什么会重写了Java的Future方法,并且增加新的方法呢?
 *  因为future的get方法是一个尴尬的方法,因为这个方法调用会一直阻塞到得到结果,一旦调用就会阻塞,而且我们不知道什么时候去调用这个方法。
 *  netty的listener方法就在一部分程度上解决了这个问题。
 *
 * TODO: JDK的Future有什么作用?与Netty有什么区别?
 *      JDK提供的Future只能通过手工的方法检查执行结果,而这个操作是阻塞的,Netty则对ChannleFuture进行了增强,通过channelFutureListener
 *      以回调的方法来获取执行结果,去除手工检查阻塞的操作,值得注意的是:channelFutureListener的OperationComplete方法是由IO线程执行的,
 *      因此要注意的是不要在这里执行耗时的操作,或者需要通过另外的线程池来执行。
 *
 *  这里画个图来说明:
 *                                                  ---addListner->  listnenerA
 *  channelFuture继承了future对象。  <-   Future --->
 *                                                  ---addListner->  listnenerB
 *  如果任务执行完了,那么Future会调用注册在Future上的每一个listener,然后调用 hannelFutureListener的operationComplete方法,并且
 *  把future自己这个对象传递给每个future,listener拿到这个future,就可以拿到关联的channel,就可以对channnle进行任何操作。
 *
 *  TODO:Future如何知道channel中的任务是完成了呢?然后调用他们的方法呢?
 *      那么就要说说 promise这个接口了。这个接口是可写的而且只能写一次,不管成功还是失败。
 *      promise : 中文是程诺的意思。
 *
 */
@SuppressWarnings("ClassNameSameAsAncestorName")
public interface Future<V> extends java.util.concurrent.Future<V> 

    /**
     * Returns @code true if and only if the I/O operation was completed
     * successfully.
     * TODO: 如果 IO 操作成功完成,返回 true,isSuccess是isDone方法的一个特例。
     */
    boolean isSuccess();

    /**
     * returns @code true if and only if the operation can be cancelled via @link #cancel(boolean).
     *
     * 当且仅当操作可以取消,返回 true
     */
    boolean isCancellable();

    /**
     * Returns the cause of the failed I/O operation if the I/O operation has
     * failed.
     *
     * @return the cause of the failure.
     *         @code null if succeeded or this future is not
     *         completed yet.
     *
     * 如果 IO 操作失败,返回 IO 操作失败的原因
     * 如果操作成功或者 future 没有完成,返回 null
     *
     */
    Throwable cause();

    /**
     * Adds the specified listener to this future.  The
     * specified listener is notified when this future is
     * @linkplain #isDone() done.  If this future is already
     * completed, the specified listener is notified immediately.
     *
     * 将指定的侦听器添加到此 future, 当这个 future 处于 isDone() 状态的时候指定的监听器会接到通知
     * 如果这个 future 已经完成后,指定的侦听器会立即收到通知
     *
     * 这里实际用到了观察者模式
     *
     * 为当前Future实例添加监听Future操作完成的监听器 - isDone()方法激活之后所有监听器实例会得到回调
     */
    Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);

    /**
     * Adds the specified listeners to this future.  The
     * specified listeners are notified when this future is
     * @linkplain #isDone() done.  If this future is already
     * completed, the specified listeners are notified immediately.
     *
     * 将指定的多个侦听器添加到此 future, 当这个 future 处于 isDone() 状态的时候指定的多个监听器都会接到通知
     * 如果这个 future 已经完成后,指定的多个侦听器会立即都收到通知
     */
    Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);

    /**
     * Removes the first occurrence of the specified listener from this future.
     * The specified listener is no longer notified when this
     * future is @linkplain #isDone() done.  If the specified
     * listener is not associated with this future, this method
     * does nothing and returns silently.
     */
    Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);

    /**
     * Removes the first occurrence for each of the listeners from this future.
     * The specified listeners are no longer notified when this
     * future is @linkplain #isDone() done.  If the specified
     * listeners are not associated with this future, this method
     * does nothing and returns silently.
     */
    Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);

    /**
     * Waits for this future until it is done, and rethrows the cause of the failure if this future
     * failed.
     *
     * 等待这个 Future,直到完成为止,如果这个 Future 失败了,就会重新抛出失败的原因。
     * 同步等待Future完成得到最终结果(成功)或者抛出异常(失败),响应中断
     */
    Future<V> sync() throws InterruptedException;

    /**
     * Waits for this future until it is done, and rethrows the cause of the failure if this future
     * failed.
     *
     * 等待这个 Future,直到完成为止,如果这个 Future 失败了,就会重新抛出失败的原因。
     * 同步等待Future完成得到最终结果(成功)或者抛出异常(失败),不响应中断
     */
    Future<V> syncUninterruptibly();

    /**
     * Waits for this future to be completed.
     *
     * @throws InterruptedException
     *         if the current thread was interrupted
     *
     * 同步等待Future完成得到最终结果(成功)或者抛出异常(失败),不响应中断
     */
    Future<V> await() throws InterruptedException;

    /**
     * Waits for this future to be completed without
     * interruption.  This method catches an @link InterruptedException and
     * discards it silently.
     *
     * 等待Future完成,不响应中断
     */
    Future<V> awaitUninterruptibly();

    /**
     * Waits for this future to be completed within the
     * specified time limit.
     *
     * @return @code true if and only if the future was completed within
     *         the specified time limit
     *
     * @throws InterruptedException
     *         if the current thread was interrupted
     *
     * 带超时时限的等待Future完成,响应中断
     */
    boolean await(long timeout, TimeUnit unit) throws InterruptedException;

    /**
     * Waits for this future to be completed within the
     * specified time limit.
     *
     * @return @code true if and only if the future was completed within
     *         the specified time limit
     *
     * @throws InterruptedException
     *         if the current thread was interrupted
     *
     * 带超时时限的等待Future完成,不响应中断
     */
    boolean await(long timeoutMillis) throws InterruptedException;

    /**
     * Waits for this future to be completed within the
     * specified time limit without interruption.  This method catches an
     * @link InterruptedException and discards it silently.
     *
     * @return @code true if and only if the future was completed within
     *         the specified time limit
     */
    boolean awaitUninterruptibly(long timeout, TimeUnit unit);

    /**
     * Waits for this future to be completed within the
     * specified time limit without interruption.  This method catches an
     * @link InterruptedException and discards it silently.
     *
     * @return @code true if and only if the future was completed within
     *         the specified time limit
     */
    boolean awaitUninterruptibly(long timeoutMillis);

    /**
     * Return the result without blocking. If the future is not done yet this will return @code null.
     *
     * As it is possible that a @code null value is used to mark the future as successful you also need to check
     * if the future is really done with @link #isDone() and not rely on the returned @code null value.
     *
     * 没有阻塞的返回结果,如果 future 没有完成,那么将会返回空。
     *
     * 不要依赖null值去判断是否完成,因为结果可能就是返回null,因此你需要调用 isDone 方法去判断。因为 Runable 永远返回 null.
     * 非阻塞马上返回Future的结果,如果Future未完成,此方法一定返回null;有些场景下如果Future成功
     * 获取到的结果是null则需要二次检查isDone()方法是否为true
     */
    V getNow();

    /**
     * @inheritDoc
     *
     * If the cancellation was successful it will fail the future with a @link CancellationException.
     *
     * 取消当前Future实例的执行,如果取消成功会抛出CancellationException异常
     */
    @Override
    boolean cancel(boolean mayInterruptIfRunning);


sync()和await()方法类似,只是sync()会检查异常执行的情况,一旦发现执行异常马上把异常实例包装抛出,而await()方法对异常无感知。

接着看io.netty.util.concurrent.Promise接口:

public interface Promise<V> extends Future<V> 
   
    // 标记当前Future成功,设置结果,如果设置成功,则通知所有的监听器,如果Future已经成功或者失败,则抛出IllegalStateException
    Promise<V> setSuccess(V result);

    // 标记当前Future成功,设置结果,如果设置成功,则通知所有的监听器并且返回true,否则返回false
    boolean trySuccess(V result);

    // 标记当前Future失败,设置结果为异常实例,如果设置成功,则通知所有的监听器,如果Future已经成功或者失败,则抛出IllegalStateException
    Promise<V> setFailure(Throwable cause);

    // 标记当前Future失败,设置结果为异常实例,如果设置成功,则通知所有的监听器并且返回true,否则返回false
    boolean tryFailure(Throwable cause);
    
    // 标记当前的Promise实例为不可取消,设置成功返回true,否则返回false
    boolean setUncancellable();

    // 下面的方法和io.netty.util.concurrent.Future中的方法基本一致,只是修改了返回类型为Promise

    @Override
    Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);

    @Override
    Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);

    @Override
    Promise<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);

    @Override
    Promise<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);

    @Override
    Promise<V> await() throws InterruptedException;

    @Override
    Promise<V> awaitUninterruptibly();

    @Override
    Promise<V> sync() throws InterruptedException;

    @Override
    Promise<V> syncUninterruptibly();


到此,Promise接口的所有功能都分析完毕,接下来从源码角度详细分析Promise的实现。

4.Promise源码实现

Promise的实现类为io.netty.util.concurrent.DefaultPromise(其实DefaultPromise还有很多子类,某些实现是为了定制特定的场景做了扩展),而DefaultPromise继承自io.netty.util.concurrent.AbstractFuture

public abstract class AbstractFuture<V> implements Future<V> 

    // 永久阻塞等待获取结果的方法
    @Override
    public V get() throws InterruptedException, ExecutionException 
        // 阻塞直到异步操作完成
        await();

        // 从永久阻塞中唤醒后,先判断Future是否执行异常
        Throwable cause = cause();
        if (cause == null) 
            // 异常为空说明执行成功,调用getNow()方法返回结果
            return getNow();
        
        // 异常为空不为空,这里区分特定的取消异常则转换为CancellationException抛出
        if (cause instanceof CancellationException) 
            // 由用户取消
            throw (CancellationException) cause;
        
        // 非取消异常的其他所有异常都被包装为执行异常ExecutionException抛出
        throw new ExecutionException(cause);
    

    // 带超时阻塞等待获取结果的方法
    @Override
    public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException 
        // 调用响应中断的带超时时限等待方法进行阻塞
        if (await(timeout, unit)) 
            // 从带超时时限阻塞中唤醒后,先判断Future是否执行异常
            Throwable cause = cause();
            // 异常为空说明执行成功,调用getNow()方法返回结果
            if (cause == null) 
                return getNow();
            
            // 异常为空不为空,这里区分特定的取消异常则转换为CancellationException抛出
            if (cause instanceof CancellationException) 
                throw (CancellationException) cause;
            
            // 在非等待超时的前提下,非取消异常的其他所有异常都被包装为执行异常ExecutionException抛出
            throw new ExecutionException(cause);
        
        // 方法步入此处说明等待超时,则抛出超时异常TimeoutException
        throw new TimeoutException();
    


AbstractFuture仅仅对get()和get(long timeout, TimeUnit unit)两个方法进行了实现,其实这两处的实现和java.util.concurrent.FutureTask中的实现方式十分相似。

DefaultPromise的源码比较多,这里分开多个部分去阅读,先看它的属性和构造函数:

public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> 

    // 正常日志的日志句柄,InternalLogger是Netty内部封装的日志接口
    private static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultPromise.class);

    // 任务拒绝执行时候的日志句柄 - Promise需要作为一个任务提交到线程中执行,如果任务拒绝则使用此日志句柄打印日志
    private static final InternalLogger rejectedExecutionLogger =
            InternalLoggerFactory.getInstance(DefaultPromise.class.getName() + ".rejectedExecution");

    // 监听器的最大栈深度,默认值为8,这个值是防止嵌套回调调用的时候栈深度过大导致内存溢出,后面会举个例子说明它的用法
    private static final int MAX_LISTENER_STACK_DEPTH = Math.min(8,
            SystemPropertyUtil.getInt("io.netty.defaultPromise.maxListenerStackDepth", 8));
    
    // 结果更新器,用于CAS更新结果result的值
    @SuppressWarnings("rawtypes")
    private static final AtomicReferenceFieldUpdater<DefaultPromise, Object> RESULT_UPDATER =
            AtomicReferenceFieldUpdater.newUpdater(DefaultPromise.class, Object.class, "result");
    
    // 用于填充result的值,当设置结果result传入null,Promise执行成功,用这个值去表示成功的结果
    private static final Object SUCCESS = new Object();
    
    // 用于填充result的值,表示Promise不能被取消
    private static final Object UNCANCELLABLE = new Object();
    
    // CancellationException实例的持有器,用于判断Promise取消状态和抛出CancellationException
    private static final CauseHolder CANCELLATION_CAUSE_HOLDER = new CauseHolder(ThrowableUtil.unknownStackTrace(
            new CancellationException(), DefaultPromise.class, "cancel(...)"));
    
    // CANCELLATION_CAUSE_HOLDER的异常栈信息元素数组
    private static final StackTraceElement[] CANCELLATION_STACK = CANCELLATION_CAUSE_HOLDER.cause.getStackTrace();
    
    // 真正的结果对象,使用Object类型,最终有可能为null、真正的结果实例、SUCCESS、UNCANCELLABLE或者CANCELLATION_CAUSE_HOLDER等等
    private volatile Object result;
    
    // 事件执行器,这里暂时不做展开,可以理解为单个调度线程
    private final EventExecutor executor;
    
     // 监听器集合,可能是单个GenericFutureListener实例或者DefaultFutureListeners(监听器集合)实例
    private Object listeners;
    
    // 等待获取结果的线程数量
    private short waiters;

    // 标记是否正在回调监听器
    private boolean notifyingListeners;

    // 构造函数依赖于EventExecutor
    public DefaultPromise(EventExecutor executor) 
        this.executor = checkNotNull(executor, "executor");
    

    protected DefaultPromise() 
        // only for subclasses - 这个构造函数预留给子类
        executor = null;
    

    // ... 省略其他代码 ...

    // 私有静态内部类,用于存放Throwable实例,也就是持有异常的原因实例
    private static final class CauseHolder 
        final Throwable cause;
        CauseHolder(Throwable cause) 
            this.cause = cause;
        
    

    // 私有静态内部类,用于覆盖CancellationException的栈信息为前面定义的CANCELLATION_STACK,同时覆盖了toString()返回CancellationException的全类名
    private static final class LeanCancellationException extends CancellationException 
        private static final long serialVersionUID = 2794674970981187807L;

        @Override
        public Throwable fillInStackTrace() 
            setStackTrace(CANCELLATION_STACK);
            return this;
        

        @Override
        public String toString() 
            return CancellationException.class.getName();
        
    
    // ... 省略其他代码 ...


Promise目前支持两种类型的监听器:

  • GenericFutureListener:支持泛型的Future监听器。

  • GenericProgressiveFutureListener:它是GenericFutureListener的子类,支持进度表示和支持泛型的Future监听器(有些场景需要多个步骤实现,类似于进度条那样)。

// GenericFutureListener
public interface GenericFutureListener<F extends Future<?>> extends EventListener 

    void operationComplete(F future) throws Exception;


// GenericProgressiveFutureListener
public interface GenericProgressiveFutureListener<F extends ProgressiveFuture<?>> extends GenericFutureListener<F> 
    
    void operationProgressed(F future, long progress, long total) throws Exception;


为了让Promise支持多个监听器,Netty添加了一个默认修饰符修饰的DefaultFutureListeners类用于保存监听器实例数组:

// DefaultFutureListeners
final class DefaultFutureListeners 

    private GenericFutureListener<? extends Future<?>>[] listeners;
    private int size;
    private int progressiveSize; // the number of progressive listeners
    
    // 这个构造相对特别,是为了让Promise中的listeners(Object类型)实例由单个GenericFutureListener实例转换为DefaultFutureListeners类型
    @SuppressWarnings("unchecked")
    DefaultFutureListeners(GenericFutureListener<? extends Future<?>> first, GenericFutureListener<? extends Future<?>> second) 
        listeners = new GenericFutureListener[2];
        listeners[0] = first;
        listeners[1] = second;
        size = 2;
        if (first instanceof GenericProgressiveFutureListener) 
            progressiveSize ++;
        
        if (second instanceof GenericProgressiveFutureListener) 
            progressiveSize ++;
        
    

    public void add(GenericFutureListener<? extends Future<?>> l) 
        GenericFutureListener<? extends F

以上是关于nettyNetty并发工具-Promise的主要内容,如果未能解决你的问题,请参考以下文章

从源码上理解Netty并发工具-Promise

Promise并发控制

Promise.all或者Promise.race处理并发请求

Promise.all并发限制

Promise.all并发限制

Promise.all并发限制