Netty源码分析-PromiseCombiner

Posted 征服.刘华强

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Netty源码分析-PromiseCombiner相关的知识,希望对你有一定的参考价值。

 

这个类的作用是用户传递一个消息和一个Promise对象,但是实际编码时把一个消息转化为多个消息输出,那么实际会生成多个Promise对象,PromiseCombiner的作用就是代理用户传递的Promise,当所有消息输出操作完成时,通知用户。

/*
 * Copyright 2016 The Netty Project
 *
 * The Netty Project licenses this file to you under the Apache License,
 * version 2.0 (the "License"); you may not use this file except in compliance
 * with the License. You may obtain a copy of the License at:
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations
 * under the License.
 */
package io.netty.util.concurrent;

import io.netty.util.internal.ObjectUtil;

/**
 * <p>A promise combiner monitors the outcome of a number of discrete futures, then notifies a final, aggregate promise
 * when all of the combined futures are finished. The aggregate promise will succeed if and only if all of the combined
 * futures succeed. If any of the combined futures fail, the aggregate promise will fail. The cause failure for the
 * aggregate promise will be the failure for one of the failed combined futures; if more than one of the combined
 * futures fails, exactly which cause of failure will be assigned to the aggregate promise is undefined.</p>
 *
 * <p>Callers may populate a promise combiner with any number of futures to be combined via the
 * @link PromiseCombiner#add(Future) and @link PromiseCombiner#addAll(Future[]) methods. When all futures to be
 * combined have been added, callers must provide an aggregate promise to be notified when all combined promises have
 * finished via the @link PromiseCombiner#finish(Promise) method.</p>
 *
 * <p>This implementation is <strong>NOT</strong> thread-safe and all methods must be called
 * from the @link EventExecutor thread.</p>
 */
//Promise合并器,把多个Promise合并为一个,当操作成功时一起通知
public final class PromiseCombiner 
    private int expectedCount;
    private int doneCount;
    private Promise<Void> aggregatePromise;
    private Throwable cause;
    private final GenericFutureListener<Future<?>> listener = new GenericFutureListener<Future<?>>() 
        @Override
        public void operationComplete(final Future<?> future) 
            //每个Future无论成功还是失败后的回调事件
            if (executor.inEventLoop()) 
                operationComplete0(future);
             else 
                executor.execute(new Runnable() 
                    @Override
                    public void run() 
                        operationComplete0(future);
                    
                );
            
        

        private void operationComplete0(Future<?> future) 
            assert executor.inEventLoop();
            ++doneCount; //Future完成数量计数
            //如果有失败的则赋值给cause
            if (!future.isSuccess() && cause == null) 
                cause = future.cause();
            
            //如果全部future完成则通知代理Promise
            if (doneCount == expectedCount && aggregatePromise != null) 
                tryPromise();
            
        
    ;

    private final EventExecutor executor;

    /**
     * Deprecated use @link PromiseCombiner#PromiseCombiner(EventExecutor).
     */
    @Deprecated
    public PromiseCombiner() 
        this(ImmediateEventExecutor.INSTANCE);
    

    /**
     * The @link EventExecutor to use for notifications. You must call @link #add(Future), @link #addAll(Future[])
     * and @link #finish(Promise) from within the @link EventExecutor thread.
     *
     * @param executor the @link EventExecutor to use for notifications.
     */
    public PromiseCombiner(EventExecutor executor) 
        this.executor = ObjectUtil.checkNotNull(executor, "executor");
    

    /**
     * Adds a new promise to be combined. New promises may be added until an aggregate promise is added via the
     * @link PromiseCombiner#finish(Promise) method.
     *
     * @param promise the promise to add to this promise combiner
     *
     * @deprecated Replaced by @link PromiseCombiner#add(Future).
     */
    @Deprecated
    public void add(Promise promise) 
        add((Future) promise);
    

    /**
     * Adds a new future to be combined. New futures may be added until an aggregate promise is added via the
     * @link PromiseCombiner#finish(Promise) method.
     *
     * @param future the future to add to this promise combiner
     */
    @SuppressWarnings( "unchecked", "rawtypes" )
    public void add(Future future) 
        checkAddAllowed();
        checkInEventLoop();
        //记录添加了多少个Future
        ++expectedCount;
        //给每个future添加回调事件
        future.addListener(listener);
    

    /**
     * Adds new promises to be combined. New promises may be added until an aggregate promise is added via the
     * @link PromiseCombiner#finish(Promise) method.
     *
     * @param promises the promises to add to this promise combiner
     *
     * @deprecated Replaced by @link PromiseCombiner#addAll(Future[])
     */
    @Deprecated
    public void addAll(Promise... promises) 
        addAll((Future[]) promises);
    

    /**
     * Adds new futures to be combined. New futures may be added until an aggregate promise is added via the
     * @link PromiseCombiner#finish(Promise) method.
     *
     * @param futures the futures to add to this promise combiner
     */
    @SuppressWarnings( "unchecked", "rawtypes" )
    public void addAll(Future... futures) 
        for (Future future : futures) 
            this.add(future);
        
    

    /**
     * <p>Sets the promise to be notified when all combined futures have finished. If all combined futures succeed,
     * then the aggregate promise will succeed. If one or more combined futures fails, then the aggregate promise will
     * fail with the cause of one of the failed futures. If more than one combined future fails, then exactly which
     * failure will be assigned to the aggregate promise is undefined.</p>
     *
     * <p>After this method is called, no more futures may be added via the @link PromiseCombiner#add(Future) or
     * @link PromiseCombiner#addAll(Future[]) methods.</p>
     *
     * @param aggregatePromise the promise to notify when all combined futures have finished
     */
    public void finish(Promise<Void> aggregatePromise) 
        ObjectUtil.checkNotNull(aggregatePromise, "aggregatePromise");
        checkInEventLoop();
        if (this.aggregatePromise != null) 
            throw new IllegalStateException("Already finished");
        
        this.aggregatePromise = aggregatePromise;
        if (doneCount == expectedCount) 
            tryPromise();
        
    

    private void checkInEventLoop() 
        if (!executor.inEventLoop()) 
            throw new IllegalStateException("Must be called from EventExecutor thread");
        
    

    //操作代理Promise,当所有Future完成后这个类会被调用,通知用户
    private boolean tryPromise() 
        //如果不存在异常则把aggregatePromise设置为成功,否则设置为失败
        return (cause == null) ? aggregatePromise.trySuccess(null) : aggregatePromise.tryFailure(cause);
    

    private void checkAddAllowed() 
        if (aggregatePromise != null) 
            throw new IllegalStateException("Adding promises is not allowed after finished adding");
        
    

 

以上是关于Netty源码分析-PromiseCombiner的主要内容,如果未能解决你的问题,请参考以下文章

netty里的ByteBuf扩容源码分析

Netty源码分析(七) PoolChunk

源码分析Netty4专栏

源码分析Netty4专栏

Netty-源码分析LineBasedFrameDecoder

Netty源码分析:read