netty系列之:netty中的核心MessageToMessage编码器

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了netty系列之:netty中的核心MessageToMessage编码器相关的知识,希望对你有一定的参考价值。

参考技术A 在netty中我们需要传递各种类型的消息,这些message可以是字符串,可以是数组,也可以是自定义的对象。不同的对象之间可能需要互相转换,这样就需要一个可以自由进行转换的转换器,为了统一编码规则和方便用户的扩展,netty提供了一套消息之间进行转换的框架。本文将会讲解这个框架的具体实现。

netty为消息和消息之间的转换提供了三个类,这三个类都是抽象类,分别是MessageToMessageDecoder,MessageToMessageEncoder和MessageToMessageCodec。

先来看下他们的定义:

MessageToMessageEncoder继承自ChannelOutboundHandlerAdapter,负责向channel中写消息。

MessageToMessageDecoder继承自ChannelInboundHandlerAdapter,负责从channel中读取消息。

MessageToMessageCodec继承自ChannelDuplexHandler,它是一个双向的handler,可以从channel中读取消息,也可以向channel中写入消息。

有了这三个抽象类,我们再看下这三个类的具体实现。

先看一下消息的编码器MessageToMessageEncoder,编码器中最重要的方法就是write,看下write的实现:

write方法接受一个需要转换的原始对象msg,和一个表示channel读写进度的ChannelPromise。

首先会对msg进行一个类型判断,这个判断方法是在acceptOutboundMessage中实现的。

这里的matcher是一个TypeParameterMatcher对象,它是一个在MessageToMessageEncoder构造函数中初始化的属性:

这里的I就是要匹配的msg类型。

如果不匹配,则继续调用ctx.write(msg, promise); 将消息不做任何转换的写入到channel中,供下一个handler调用。

如果匹配成功,则会调用核心的encode方法:encode(ctx, cast, out);

注意,encode方法在MessageToMessageEncoder中是一个抽象方法,需要用户在继承类中自行扩展。

encode方法实际上是将msg对象转换成为要转换的对象,然后添加到out中。这个out是一个list对象,具体而言是一个CodecOutputList对象,作为一个list,out是一个可以存储多个对象的列表。

那么out是什么时候写入到channel中去的呢?

别急,在write方法中最后有一个finally代码块,在这个代码块中,会将out写入到channel里面。

因为out是一个List,可能会出现out中的对象部分写成功的情况,所以这里需要特别处理。

首先判断out中是否只有一个对象,如果是一个对象,那么直接写到channel中即可。如果out中多于一个对象,那么又分成两种情况,第一种情况是传入的promise是一个voidPromise,那么调用writeVoidPromise方法。

什么是voidPromise呢?

我们知道Promise有多种状态,可以通过promise的状态变化了解到数据写入的情况。对于voidPromise来说,它只关心一种失败的状态,其他的状态都不关心。

如果用户关心promise的其他状态,则会调用writePromiseCombiner方法,将多个对象的状态合并为一个promise返回。

事实上,在writeVoidPromise和writePromiseCombiner中,out中的对象都是一个一个的取出来,写入到channel中的,所以才会生成多个promise和需要将promise进行合并的情况:

和encoder对应的就是decoder了,MessageToMessageDecoder的逻辑和MessageToMessageEncoder差不多。

首先也是需要判断读取的消息类型,这里也定义了一个TypeParameterMatcher对象,用来检测传入的消息类型:

decoder中重要的方法是channelRead方法,我们看下它的实现:

首先检测msg的类型,只有接受的类型才进行decode处理,否则将msg加入到CodecOutputList中。

最后在finally代码块中将out中的对象一个个取出来,调用ctx.fireChannelRead进行读取。

消息转换的关键方法是decode,这个方法也是一个抽象方法,需要在继承类中实现具体的功能。

前面讲解了一个编码器和一个解码器,他们都是单向的。最后要讲解的codec叫做MessageToMessageCodec,这个codec是一个双向的,即可以接收消息,也可以发送消息。

先看下它的定义:

MessageToMessageCodec继承自ChannelDuplexHandler,接收两个泛型参数分别是INBOUND_IN和OUTBOUND_IN。

它定义了两个TypeParameterMatcher,分别用来过滤inboundMsg和outboundMsg:

分别实现了channelRead和write方法,用来读写消息:

这里的decoder和encoder实际上就是前面我们讲到的MessageToMessageDecoder和MessageToMessageEncoder:

可以看到MessageToMessageCodec实际上就是对MessageToMessageDecoder和MessageToMessageEncoder的封装,如果需要对MessageToMessageCodec进行扩展的话,需要实现下面两个方法:

netty中提供的MessageToMessage的编码框架是后面对编码解码器进行扩展的基础。只有深入了解其中的原理,我们对于新的编码解码器运用起来才能得心应手。

netty系列之:选byte还是选message?这是一个问题

简介

UDT给了你两种选择,byte stream或者message,到底选哪一种呢?经验告诉我们,只有小学生才做选择题,而我们应该全都要!

类型的定义

UDT的两种类型是怎么定义的呢?

翻看com.barchart.udt包,可以发现这两种类型定义在TypeUDT枚举类中。

    STREAM(1),
    DATAGRAM(2), 

一个叫做STREAM,它的code是1。一个叫做DATAGRAM,他的code是2.

根据两个不同的类型我们可以创建不同的selectorProvider和channelFactory。而这两个正是构建netty服务所需要的。

在NioUdtProvider这个工具类中,netty为我们提供了TypeUDT和KindUDT的六种组合ChannelFactory,他们分别是:

用于Stream的:BYTE_ACCEPTOR,BYTE_CONNECTOR,BYTE_RENDEZVOUS。

和用于Message的:MESSAGE_ACCEPTOR,MESSAGE_CONNECTOR和MESSAGE_RENDEZVOUS。

同样的,还有两个对应的SelectorProvider,分别是:

BYTE_PROVIDER 和 MESSAGE_PROVIDER.

搭建UDT stream服务器

如果要搭建UDT stream服务器,首先需要使用NioUdtProvider.BYTE_PROVIDER来创建NioEventLoopGroup:

        final NioEventLoopGroup acceptGroup = new NioEventLoopGroup(1, acceptFactory, NioUdtProvider.BYTE_PROVIDER);
        final NioEventLoopGroup connectGroup = new NioEventLoopGroup(1, connectFactory, NioUdtProvider.BYTE_PROVIDER);

这里,我们创建两个eventLoop,分别是acceptLoop和connectLoop。

接下来就是在ServerBootstrap中绑定上面的两个group,并且指定channelFactory。这里我们需要NioUdtProvider.BYTE_ACCEPTOR:

final ServerBootstrap boot = new ServerBootstrap();
            boot.group(acceptGroup, connectGroup)
                    .channelFactory(NioUdtProvider.BYTE_ACCEPTOR)
                    .option(ChannelOption.SO_BACKLOG, 10)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new ChannelInitializer<UdtChannel>() 
                        @Override
                        public void initChannel(final UdtChannel ch) 
                            ch.pipeline().addLast(
                                    new LoggingHandler(LogLevel.INFO),
                                    new UDTByteEchoServerHandler());
                        
                    );

就这么简单。

搭建UDT message服务器

搭建UDT message服务器的步骤和stream很类似,不同的是需要使用NioUdtProvider.MESSAGE_PROVIDER作为selectorProvider:

        final NioEventLoopGroup acceptGroup =
                new NioEventLoopGroup(1, acceptFactory, NioUdtProvider.MESSAGE_PROVIDER);
        final NioEventLoopGroup connectGroup =
                new NioEventLoopGroup(1, connectFactory, NioUdtProvider.MESSAGE_PROVIDER);

然后在绑定ServerBootstrap的时候使用NioUdtProvider.MESSAGE_ACCEPTOR作为channelFactory:

final ServerBootstrap boot = new ServerBootstrap();
            boot.group(acceptGroup, connectGroup)
                    .channelFactory(NioUdtProvider.MESSAGE_ACCEPTOR)
                    .option(ChannelOption.SO_BACKLOG, 10)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new ChannelInitializer<UdtChannel>() 
                        @Override
                        public void initChannel(final UdtChannel ch)
                                throws Exception 
                            ch.pipeline().addLast(
                                    new LoggingHandler(LogLevel.INFO),
                                    new UDTMsgEchoServerHandler());
                        
                    );

同样很简单。

Stream和Message的handler

不同的UDT类型,需要使用不同的handler。

对于Stream来说,它的底层是byte,所以我们的消息处理也是以byte的形式进行的,我们以下面的方式来构建message:

private final ByteBuf message;
message = Unpooled.buffer(UDTByteEchoClient.SIZE);
        message.writeBytes("www.flydean.com".getBytes(StandardCharsets.UTF_8));

然后使用ctx.writeAndFlush(message)将其写入到channel中。

对于message来说,它实际上格式对ByteBuf的封装。netty中有个对应的类叫做UdtMessage:

public final class UdtMessage extends DefaultByteBufHolder

UdtMessage是一个ByteBufHolder,所以它实际上是一个ByteBuf的封装。

我们需要将ByteBuf封装成UdtMessage:

private final UdtMessage message;
final ByteBuf byteBuf = Unpooled.buffer(UDTMsgEchoClient.SIZE);
        byteBuf.writeBytes("www.flydean.com".getBytes(StandardCharsets.UTF_8));
        message = new UdtMessage(byteBuf);

然后将这个UdtMessage发送到channel中:

ctx.writeAndFlush(message);

这样你就学会了在UDT协议中使用stream和message两种数据类型了。

总结

大家可能觉得不同的数据类型原来实现起来这么简单。这全都要归功于netty优秀的封装和设计。

感谢netty!

本文的例子可以参考:learn-netty4

以上是关于netty系列之:netty中的核心MessageToMessage编码器的主要内容,如果未能解决你的问题,请参考以下文章

netty系列之:netty中的核心编码器bytes数组

netty系列之:netty中的核心解码器json

netty系列之:选byte还是选message?这是一个问题

Netty系列化之Google Protobuf编解码

netty系列之:NIO和netty详解

netty系列之:在netty中使用protobuf协议