Netty源码分析-MessageToMessageEncoder

Posted 征服.刘华强

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Netty源码分析-MessageToMessageEncoder相关的知识,希望对你有一定的参考价值。

 

消息到消息的编码器

/*
 * Copyright 2012 The Netty Project
 *
 * The Netty Project licenses this file to you under the Apache License,
 * version 2.0 (the "License"); you may not use this file except in compliance
 * with the License. You may obtain a copy of the License at:
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations
 * under the License.
 */
package io.netty.handler.codec;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandler;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.ReferenceCounted;
import io.netty.util.concurrent.PromiseCombiner;
import io.netty.util.internal.StringUtil;
import io.netty.util.internal.TypeParameterMatcher;

import java.util.List;

/**
 * @link ChannelOutboundHandlerAdapter which encodes from one message to an other message
 *
 * For example here is an implementation which decodes an @link Integer to an @link String.
 *
 * <pre>
 *     public class IntegerToStringEncoder extends
 *             @link MessageToMessageEncoder&lt;@link Integer&gt; 
 *
 *         @code @Override
 *         public void encode(@link ChannelHandlerContext ctx, @link Integer message, List&lt;Object&gt; out)
 *                 throws @link Exception 
 *             out.add(message.toString());
 *         
 *     
 * </pre>
 *
 * Be aware that you need to call @link ReferenceCounted#retain() on messages that are just passed through if they
 * are of type @link ReferenceCounted. This is needed as the @link MessageToMessageEncoder will call
 * @link ReferenceCounted#release() on encoded messages.
 */
public abstract class MessageToMessageEncoder<I> extends ChannelOutboundHandlerAdapter 

    //类型匹配器
    private final TypeParameterMatcher matcher;

    /**
     * Create a new instance which will try to detect the types to match out of the type parameter of the class.
     */
    protected MessageToMessageEncoder() 
        matcher = TypeParameterMatcher.find(this, MessageToMessageEncoder.class, "I");
    

    /**
     * Create a new instance
     *
     * @param outboundMessageType   The type of messages to match and so encode
     */
    protected MessageToMessageEncoder(Class<? extends I> outboundMessageType) 
        matcher = TypeParameterMatcher.get(outboundMessageType);
    

    /**
     * Returns @code true if the given message should be handled. If @code false it will be passed to the next
     * @link ChannelOutboundHandler in the @link ChannelPipeline.
     */
    public boolean acceptOutboundMessage(Object msg) throws Exception 
        return matcher.match(msg);
    

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception 
        CodecOutputList out = null;
        try 
            //匹配子类实现定义的泛型<I>
            if (acceptOutboundMessage(msg)) 
                //创建list
                out = CodecOutputList.newInstance();
                @SuppressWarnings("unchecked")
                I cast = (I) msg;
                try 
                    //子类编码逻辑,传入cats,把输出对象放入out中
                    encode(ctx, cast, out);
                 finally 
                    //释放cast
                    ReferenceCountUtil.release(cast);
                

                //如果out为空则释放out,抛出异常
                if (out.isEmpty()) 
                    out.recycle();
                    out = null;

                    throw new EncoderException(
                            StringUtil.simpleClassName(this) + " must produce at least one message.");
                
             else 
                //匹配不到则传递给其它编码器
                ctx.write(msg, promise);
            
         catch (EncoderException e) 
            throw e;
         catch (Throwable t) 
            throw new EncoderException(t);
         finally 
            if (out != null) 
                final int sizeMinusOne = out.size() - 1;
                if (sizeMinusOne == 0) 
                    //如果out当中只有一个元素,则直接传递给后续编码器
                    ctx.write(out.getUnsafe(0), promise);
                 else if (sizeMinusOne > 0) 
                    // Check if we can use a voidPromise for our extra writes to reduce GC-Pressure
                    // See https://github.com/netty/netty/issues/2525
                    //如果是voidPromise,则它不能添加任何事件,则直接循环传递给后续编码器
                    if (promise == ctx.voidPromise()) 
                        writeVoidPromise(ctx, out);
                     else 
                        //需要合并Promise
                        writePromiseCombiner(ctx, out, promise);
                    
                
                //释放资源
                out.recycle();
            
        
    

    private static void writeVoidPromise(ChannelHandlerContext ctx, CodecOutputList out) 
        //voidPromise不能添加任何listeners,也不能调用sync的操作
        final ChannelPromise voidPromise = ctx.voidPromise();
        for (int i = 0; i < out.size(); i++) 
            //循环传递到后续编码器,传递voidPromise
            ctx.write(out.getUnsafe(i), voidPromise);
        
    

    private static void writePromiseCombiner(ChannelHandlerContext ctx, CodecOutputList out, ChannelPromise promise) 
        //如果是其它ChannelPromise,则需要合并操作,使用PromiseCombiner代理用户传递的promise
        final PromiseCombiner combiner = new PromiseCombiner(ctx.executor());
        for (int i = 0; i < out.size(); i++) 
            //每次输出消息生成的Future都纳入PromiseCombiner的管理
            //当所有消息都完成时(成功或失败)则通知用户传递的promise
            combiner.add(ctx.write(out.getUnsafe(i)));
        
        //代理用户的promise,全部子操作完成时通知promise
        combiner.finish(promise);
    

    /**
     * Encode from one message to an other. This method will be called for each written message that can be handled
     * by this encoder.
     *
     * @param ctx           the @link ChannelHandlerContext which this @link MessageToMessageEncoder belongs to
     * @param msg           the message to encode to an other one
     * @param out           the @link List into which the encoded msg should be added
     *                      needs to do some kind of aggregation
     * @throws Exception    is thrown if an error occurs
     */
    //子类需要扩展此方法,根据msg,编码消息,把想要输出的消息放入out中
    protected abstract void encode(ChannelHandlerContext ctx, I msg, List<Object> out) throws Exception;

 

以上是关于Netty源码分析-MessageToMessageEncoder的主要内容,如果未能解决你的问题,请参考以下文章

源码分析Netty4专栏

源码分析Netty4专栏

Netty-源码分析LineBasedFrameDecoder

Netty源码分析:read

Netty源码分析:read

[Netty源码分析]ByteBuf(一)