Netty实战

Posted kimi77

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Netty实战相关的知识,希望对你有一定的参考价值。

一、Channel、EventLoop 和 ChannelFuture

上一篇博文我们在构建服务端和客户端中出现了一些新的类,可能有些同学还有些不了解它们的具体功能。没关系,接下来我们对于 Channel、EventLoop 和 ChannelFuture 类进行的讨论增添更多的细节,这些类合在一起,可以被认为是 Netty 网络抽象的代表:

  • Channel : Socket;
  • EventLoop : 控制流、多线程处理、并发;
  • ChannelFuture : 异步通知。

1.1 Channel 接口

基本的 I/O 操作(bind()、connect()、read()和 write())依赖于底层网络传输所提供的原语。在基于 Java 的网络编程中,其基本的构造是 class Socket。Netty 的 Channel 接口所提供的 API,大大地降低了直接使用 Socket 类的复杂性。此外,Channel 也是拥有许多预定义的、专门化实现的广泛类层次结构的根,下面是一个简短的部分清单:

  • EmbeddedChannel;
  • LocalServerChannel;
  • NioDatagramChannel;
  • NioSctpChannel;
  • NioSocketChannel。

1.2 EventLoop 接口

EventLoop 定义了 Netty 的核心抽象,用于处理连接的生命周期中所发生的事件。如图在高层次上说明了 Channel、EventLoop、Thread 以及 EventLoopGroup 之间的关系。

这些关系可以表述为:

  • 一个 EventLoopGroup 包含一个或者多个 EventLoop;
  • 一个 EventLoop 在它的生命周期内只和一个 Thread 绑定;
  • 所有由 EventLoop 处理的 I/O 事件都将在它专有的 Thread 上被处理;
  • 一个 Channel 在它的生命周期内只注册于一个 EventLoop;
  • 一个 EventLoop 可能会被分配给一个或多个 Channel。

注意,在这种设计中,一个给定 Channel 的 I/O 操作都是由相同的 Thread 执行的,实际上消除了对于同步的需要

1.3 ChannelFuture 接口

Netty 中所有的 I/O 操作都是异步的。因为一个操作可能不会立即返回,所以我们需要一种用于在之后的某个时间点确定其结果的方法。为此,Netty 提供了ChannelFuture 接口,其 addListener()方法注册了一个ChannelFutureListener,以便在某个操作完成时(无论是否成功)得到通知。

可以将 ChannelFuture 看作是将来要执行的操作的结果的占位符。它究竟什么时候被执行则可能取决于若干的因素,因此不可能准确地预测,但是可以肯定的是它将会被执行。此外,所有属于同一个 Channel 的操作都被保证其将以它们被调用的顺序被执行。

二、ChannelHandler 和 ChannelPipeline

2.1 ChannelHandler 接口

从应用程序开发人员的角度来看,Netty 的主要组件是 ChannelHandler,它充当了所有处理入站和出站数据的应用程序逻辑的容器。这是可行的,因为 ChannelHandler 的方法是由网络事件(其中术语“事件”的使用非常广泛)触发的。事实上,ChannelHandler 可专门用于几乎任何类型的动作,例如将数据从一种格式转换为另外一种格式,或者处理转换过程中所抛出的异常。

举例来说,ChannelInboundHandler 是一个我们会经常实现的子接口。这种类型的ChannelHandler 接收入站事件和数据,这些数据随后将会被我们的应用程序的业务逻辑所处理。当我们要给连接的客户端发送响应时,也可以从 ChannelInboundHandler 冲刷数据。我们的应用程序的业务逻辑通常驻留在一个或者多个 ChannelInboundHandler 中。

Netty 以适配器类的形式提供了大量默认的 ChannelHandler 实现,其旨在简化应用程序处理逻辑的开发过程。如ChannelPipeline中的每个ChannelHandler将负责把事件转发到链中的下一个 ChannelHandler。这些适配器类(及它们的子类)将自动执行这个操作,所以我们只重写那些你想要特殊处理的方法和事件。

那么为什么要用适配器的形式提供这些?

那是因为有一些适配器类可以将编写自定义的 ChannelHandler 所需要的努力降到最低限度,因为它们提供了定义在对应接口中的所有方法的默认实现。下面这些是编写自定义 ChannelHandler 时经常会用到的适配器类:

  • ChannelHandlerAdapter
  • ChannelInboundHandlerAdapter
  • ChannelOutboundHandlerAdapter
  • ChannelDuplexHandler

2.2 ChannelPipeline 接口

ChannelPipeline 提供了 ChannelHandler 链的容器,并定义了用于在该链上传播入站和出站事件流的 API。当 Channel 被创建时,它会被自动地分配到它专属的 ChannelPipeline。ChannelHandler 安装到 ChannelPipeline 中的过程如下所示:

  • 一个ChannelInitializer的实现被注册到了ServerBootstrap中或用于客户端的Bootstrap
  • 当 ChannelInitializer.initChannel()方法被调用时,ChannelInitializer将在 ChannelPipeline 中安装一组自定义的 ChannelHandler;
  • ChannelInitializer 将它自己从 ChannelPipeline 中移除。

为了审查发送或者接收数据时将会发生什么,让我们来更加深入地研究 ChannelPipeline和 ChannelHandler 之间的共生关系吧。

ChannelHandler 是专为支持广泛的用途而设计的,可以将它看作是处理往来 ChannelPipeline 事件(包括数据)的任何代码的通用容器。如图,其展示了从 ChannelHandler 派生的 ChannelInboundHandler 和ChannelOutboundHandler 接口。

使得事件流经 ChannelPipeline 是 ChannelHandler 的工作,它们是在应用程序的初始化或者引导阶段被安装的。这些对象接收事件、执行它们所实现的处理逻辑,并将数据传递给链中的下一个 ChannelHandler(有点类似责任链模式)。它们的执行顺序是由它们被添加的顺序所决定的。实际上,被我们称为 ChannelPipeline 的是这些 ChannelHandler 的编排顺序。

如图,说明了一个 Netty 应用程序中入站和出站数据流之间的区别。从一个客户端应用程序的角度来看,如果事件的运动方向是从客户端到服务器端,那么我们称这些事件为出站的,反之则称为入站的。

从上图看入站和出站 ChannelHandler 可以被安装到同一个 ChannelPipeline中。如果一个消息或者任何其他的入站事件被读取,那么它会从 ChannelPipeline 的头部开始流动,并被传递给第一个 ChannelInboundHandler。这个 ChannelHandler 不一定会实际地修改数据,具体取决于它的具体功能,在这之后,数据将会被传递给链中的下一个ChannelInboundHandler。最终,数据将会到达 ChannelPipeline 的尾端,届时,所有处理就都结束了。

数据的出站运动(即正在被写的数据)在概念上也是一样的。在这种情况下,数据将从ChannelOutboundHandler 链的尾端开始流动,直到它到达链的头部为止。在这之后,出站数据将会到达网络传输层,这里显示为 Socket。通常情况下,这将触发一个写操作。

ps:通过使用作为参数传递到每个方法的 ChannelHandlerContext事件可以被传递给当前ChannelHandler 链中的下一个ChannelHandler。因为你有时会忽略那些不感兴趣的事件,所以 Netty提供了抽象基类 ChannelInboundHandlerAdapter 和 ChannelOutboundHandlerAdapter。 ChannelHandlerContext 上的对应方法,每个都提供了简单地将事件传递给下一ChannelHandler的方法的实现。随后,你可以通过重写你所感兴趣的那些方法来扩展这些类。

上图中出站和入站的ChannelHandler都在同一个ChannelPipeline中,那么ChannelPipeline是如何区分和处理这两种不同的类别的呢?

虽然 ChannelInboundHandle 和ChannelOutboundHandle 都扩展自 ChannelHandler,但是 Netty 能区分 ChannelInboundHandler 实现和 ChannelOutboundHandler 实现,并确保数据只会在具有相同定向类型的两个 ChannelHandler 之间传递。

当ChannelHandler 被添加到ChannelPipeline 时,它将会被分配一个ChannelHandlerContext,其代表了 ChannelHandler 和 ChannelPipeline 之间的绑定。虽然这个对象可以被用于获取底层的 Channel,但是它主要还是被用于写出站数据。

在 Netty 中,有两种发送消息的方式。我们可以直接写到 Channel 中,也可以 写到和 ChannelHandler相关联的ChannelHandlerContext对象中。前一种方式将会导致消息从ChannelPipeline 的尾端开始流动,而后者将导致消息从 ChannelPipeline 中的下一个 ChannelHandler 开始流动。

总结一下:

  • 将消息写入Channel 它将从尾端开始流动。
  • 将消息写入ChannelHandler中,它将会从下一个ChannelHandler开始流动。

2.3 编码器和解码器

当我们通过 Netty 发送或者接收一个消息的时候,就将会发生一次数据转换。入站消息会被解码;也就是说,从字节转换为另一种格式,通常是一个 Java 对象。如果是出站消息,则会发生相反方向的转换:它将从它的当前格式被编码为字节。这两种方向的转换的原因很简单:网络数据总是一系列的字节。(编解码)

对应于特定的需要,Netty 为编码器和解码器提供了不同类型的抽象类。例如,我们的应用程序可能使用了一种中间格式,而不需要立即将消息转换成字节。我们将仍然需要一个编码器,但是它将派生自一个不同的超类。为了确定合适的编码器类型,我们可以应用一个简单的命名约定。通常来说,这些基类的名称将类似于 ByteToMessageDecoder 或 MessageToByteEncoder。对于特殊的类型,我们会发现类似于 ProtobufEncoder 和 ProtobufDecoder这样的名称——预置的用来支持 Google 的 Protocol Buffers。

严格地说,其他的处理器也可以完成编码器和解码器的功能。但是,正如有用来简化ChannelHandler 的创建的适配器类一样,所有由 Netty 提供的编码器/解码器适配器类都实现了 ChannelOutboundHandler 或者 ChannelInboundHandler 接口。

我们会发现对于入站数据来说,channelRead 方法/事件已经被重写了。对于每个从入站Channel 读取的消息,这个方法都将会被调用。随后,它将调用由预置解码器所提供的 decode()方法,并将已解码的字节转发给 ChannelPipeline 中的下一个 ChannelInboundHandler。
出站消息的模式是相反方向的:编码器将消息转换为字节,并将它们转发给下一个ChannelOutboundHandler。

2.4 抽象类 SimpleChannelInboundHandler

最常见的情况是,你的应用程序会利用一个 ChannelHandler 来接收解码消息,并对该数据应用业务逻辑。要创建一个这样的 ChannelHandler,我们只需要扩展基类 SimpleChannelInboundHandler,其中 T 是你要处理的消息的 Java 类型 。在这个 ChannelHandler 中,我们需要重写基类的一个或者多个方法,并且获取一个到 ChannelHandlerContext 的引用,这个引用将作为输入参数传递给 ChannelHandler 的所有方法。

在这种类型的 ChannelHandler 中,最重要的方法是 channelRead0(ChannelHandlerContext,T)。除了要求不要阻塞当前的 I/O 线程之外,其具体实现完全取决于我们。

三、引导

Netty 的引导类为应用程序的网络层配置提供了容器,这涉及将一个进程绑定到某个指定的端口(服务端),或者将一个进程连接到另一个运行在某个指定主机的指定端口上的进程(客户端)。

严格来说,“连接”这个术语仅适用于面向连接的协议,如 TCP,其保证了两个连接端点之间消息的有序传递

因此,有两种类型的引导:一种用于客户端(简单地称为 Bootstrap),而另一种(ServerBootstrap)用于服务器。无论我们的应用程序使用哪种协议或者处理哪种类型的数据,唯一决定它使用哪种引导类的是它是作为一个客户端还是作为一个服务器(后面我们会单独开出一篇来写引导,它很重要)。

类别 Bootstrap ServerBootstrap
网络编程中的作用 连接到远程主机和端口 绑定到一个本地端口
EventLoopGroup 的数目 1 2

ps:实际上,ServerBootstrap 类也可以只使用一个 EventLoopGroup,此时其将在两个场景下共用同一个 EventLoopGroup

细心的同学应该发现了,ServerBootstrap使用了2个EventLoopGroup,这是因为服务器需要两组不同的 Channel。

  • 第一组将只包含一个 ServerChannel,代表服务器自身的已绑定到某个本地端口的正在监听的套接字。(专门用来创建Channel )
  • 而第二组将包含所有已创建的用来处理传入客户端连接(对于每个服务器已经接受的连接都有一个)的 Channel。(专门为Channel分配EventLoop)

它们的关系如图:

ServerChannel 相关联的 EventLoopGroup 将分配一个负责为传入连接请求创建Channel 的 EventLoop。一旦连接被接受,第二个 EventLoopGroup 就会给它的 Channel分配一个 EventLoop。

Netty系列三Netty实战篇

配合示例代码

​ 这一篇我们就玩起来,通过一些常用的实战问题,来理解如何使用Netty进行网络编程。

一、传递POJO

​ 第一个示例参见示例代码中的com.roy.netty.pojoTransfer。

​ 这个示例实现的功能是这样的:

1、客户端建立连接后,就会往服务端发送一个User对象。
2、服务端接受到User对象后,给User加100薪水(salary属性),然后返回给客户端。 感觉挺爽的把!!
3、客户端接收到服务端的消息后打印出来。

​ 整体的数据流程是这样的:

在这里插入图片描述

示例解读

​ 这个示例的重点是理解Netty的编码器与解码器。在这个示例中,客户端与服务端是希望以User对象来互相传递数据,但是在编写网络应用程序时,数据只能以0和1组成的二进制字节码数据在网络中传输。所以需要在出站(发送数据)时,通过PojoEncoder将User对象按照一定的规则转化成二进制的字节码数据,在Netty中,就是通过ByteBuf来对二进制数据进行封装。而在入站(接收到数据)后,也需要通过PojoDecoder将二进制数据转换成为User对象,然后再进行具体的业务处理。而在具体实现时,就是将需要传输的字段转成byte,然后按照固定的顺序传输或者解码就行。这个跟Hadoop的Mappreduce定制传输对象的思路是一样的。

​ 在Netty中提供了两组编码解码的抽象类: MessageToByteEncoder和MessageToByteDecoder,以及MessageToMessageEncoder和MessageToMessageDecoder。

​ 这两组编解码抽象类都是由ChannelHandler扩展出来的抽象实现。他们都提供了一个泛型,只对泛型对应的类型的数据才进行编解码操作。所以在定制开发时,如果有多个对象,可以定制多个不同泛型的编解码器,然后添加到pipeline中就可以了。

​ 而这两组编解码器的区别就在于MessageToByte是把消息转成一个字节流,然后就会立即写到context里。而MessageToMessage是把消息转换成另外的多个消息,然后再依次将这些消息写入到Context当中。我们示例中只在一个User对象与字节流中进行编解码,所有用MessageToByte就足够了。但是假如是要在一个User数组与字节流中进行编解码转换,那用MessageToMessage就更好一点。

​ 另外,在Netty中,其实也提供了很多的编解码器,比如MessageToMessageEncoder的子类:StringEncoder,RedisEnoder,LineEncoder,HttpObjectEncoder, 还有MessageToByteEncoder的子类:ObjectEncoder 这些都是一看名字就很容易明白的编解码器。其中这个RedisEncoder看着是不是比较有意思?是的,这就是用Redis来传递网络数据,这样相当于给网络数据做了一个中转站。有了Redis后,是不是对这些底层的网络协议找到了一些熟悉的感觉?另外还有LineBasedFrameDecoder,使用行尾控制符(\\n或者\\r\\n)作为分隔符来解析数据,在Netty内部也有使用。DelimiterBasedFrameDecoder,使用自定义的特殊字符作为消息分隔符。LengthFieldBasedFrameDecoder:通过指定长度来标识整包信息,这样可以处理TCP的粘包和半包问题。

​ 还有,其实针对以对象为基础的网络请求,Netty中自带了ObjectDecoder和ObjectEncoder可以实现POJO对象或各种业务对象的编解码工作。但是这些编解码底层使用的是java自带的序列化技术,而java序列化技术本身效率不是很高,存在一些问题。比如无法跨语言,序列化后的体积会非常大,序列化性能太低等。在某些追求效率的场景,就会采用性能更高的序列化方案。最为常用的就是Google的Protobuf。

二、Google Protobuf

​ Protobuf是Google发布的一个开源项目,全称Google Protocol Buffers,是一种轻便高效的结构化数据存储格式,可以用于结构化数据串行化。他很适合做数据存储或者RPC远程调用的数据交换格式。

​ 参考文档地址:https://developers.google.com/protocol-buffers/docs/proto

​ 他支持跨平台、跨语言。支持目前绝大多数语言,比如C,C++,Java,Python等。他是通过编写一个.proto文件来对类进行描述,然后可以通过下载下来的protoc.exe编译器自动生成.java文件。然后用这个生成出来的java对象来进行传输。

​ 示例参见示例代码中的com.roy.netty.protobufSingleObject包。这个包下实现了只传输一个Student对象的示例。另一个示例在com.roy.netty.protobuf包下。这个包下实现了可传输多个对象的示例。

​ 注意使用Protobuf需要加入maven依赖包:

<dependency>
	<groupId>com.google.protobuf</groupId>
	<artifactId>protobuf-java</artifactId>
	<version>3.6.1</version>
</dependency>

三、TCP粘包与拆包

​ TCP粘包是编写网络应用时经常会遇到的问题。其实在我们的第一个示例pojoTransfer中,已经接触到了粘包的问题。在那个示例中,客户端的PojoNettyClientHandler中的channelActive方法中,如果使用for循环发十个user用户信息,服务端最终能收到的只有一个用户。而最终返回给客户端的是这样的一个用户信息:

在这里插入图片描述

​ 用户名变成了很长的一串,并且还有一些乱码。这个问题其实就是因为TCP的粘包问题。

​ TCP是面向连接,面向流的,提供高可靠的服务。消息发送端如果一次要发送多个数据包,为了更有效的发送数据,就会使用优化算法Nagle算法,将多次间隔发送的较小的数据包合并成一个大的数据块,然后进行封包。这样能提高传输消息的效率。但是接收端如果不做处理,就会无法分辨出数据包之间的边界,造成消息混乱。

在这里插入图片描述

​ 处理粘包问题的关键就是要在接收端确认边界。处理的方式有很多,比如针对我们那个示例,可以这样处理:

方法1、去掉userName属性,固定User的数据包长度: 在User对象转成ByteBuf时,就是因为userName属性转换成字节流后大小不固定,所以整个User对象转换成字节流后,长度也不固定。如果User对象中的各个属性都是一些长度固定的基础类型,那整个User对象的字节流长度也就固定了。Decoder解析字节流时,只要固定读取的长度,就可以还原成正确的User对象。

public class User {
    private int userId;
    private Double userSalary;
    //去掉userName属性
}

**方法2、定制字节流发送方式,在字节流前面加上整个User对象的字节流长度:**例如在PojoEncoder中把username的长度也写到字节流当中

public class PojoEncoder extends MessageToByteEncoder<User> {
    @Override
    protected void encode(ChannelHandlerContext ctx, User msg, ByteBuf out) throws Exception {
        out.writeInt(msg.getUserId());
        out.writeDouble(msg.getUserSalary());
        //处理粘包问题的一种方法:写入username字符串的长度。
        final byte[] usernameBytes = msg.getUserName().getBytes("utf-8");
        out.writeInt(usernameBytes.length);
        
        out.writeBytes(msg.getUserName().getBytes("utf-8"));
    }
}

然后在PojoDecoder中也按照这个长度来解析username

public class PojoDecoder extends ByteToMessageDecoder {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        System.out.println("读到的字符长度:"+in.readableBytes());
        final int userId = in.readInt();
        final double userSalary = in.readDouble();

        //处理粘包问题的一种方法,按固定长度读取username的字节流
        final int usernameLength = in.readInt();
        final ByteBuf userNameBuffer = in.readBytes(usernameLength);
        final String userName = userNameBuffer.toString(Charset.forName("utf-8"));

        User user = new User();
        user.setUserId(userId);
        user.setUserSalary(userSalary);
        user.setUserName(userName);
        out.add(user);
    }
}

​ 这样的话,在PojoNettyClientHandler中发多少个User对象都不会粘包了。

​ 通常,在开发过程中,定义字节流长度的方式,通常会被封装成一些自定义的协议,比如Dubbo框架中定义服务地址会以 dubbo: 开头,这其实就是Dubbo定义的一种报文协议,其本质也是通过定义报文长度,定制化报文的编解码方式。另外在Redis中,客户端与Redis的每个交互指令也是基于这样的机制。

四、心跳检测

​ 在基于TCP协议的网络应用开发中,客户端与服务端会建立一个长连接,而要维护这个长连接不被断开,就需要有心跳检测机制定期检查并且维护连接的状态。就好比两个人打电话,不管两个人说不说话,只要不挂断电话,那两个人的电话连接是永远不断开的。

​ 心跳检测机制是基于长连接的网路应用中非常重要也非常常见的一个底层机制。很多开源框架都需要心跳机制来及时检查并维护分布式系统的稳定性。微服务体系中,服务端要注册到注册中心,要通过心跳机制保证连接的有效性。RocketMQ的Broker也要注册到Nameserver上,并通过心跳机制保证连接的有效性。

​ 如果要自己开发一个心跳检测机制,还是挺麻烦的,需要有大量的定时任务。但是,使用Netty实现心跳检测就非常简单了,因为Netty全都帮我们封装好了。他的核心就是IdleStateHandler这个Handler。

​ 示例代码参见:com.roy.netty.heartbeat包。示例中演示了如何在客户端与服务端之间发心跳检测包。把数据交互都去掉,就会出现心跳超时的事件。另外,这个IdleStateHandler也是一个非常好的学习handler声明周期的地方。在实际开发中,心跳检测一般就只监控读写空闲即可。

五、Netty整合Log4j

​ 关于Netty如何整合Log4j,其实在上一个心跳检测的示例中已经有了。 只需要添加一个Netty封装的LoggingHandler就可以了。这个时候还可以在classpath下添加一个log4j.properties,对log4j日志格式进行配置。

六、WebSocket

​ 通常,前端与后端都是通过Http请求来交互的,而Http协议是无状态的,浏览器与服务器之间的每请求 一次,就要重新创建新的连接。这种方式,对于像聊天室这样的场景是不合适的。每次发消息都要重建连接,效率很低。

​ 而WebSocket是在前端页面与后端服务端之间建立一个长连接,基于长连接的数据交互就省掉了很多创建销毁连接的性能开销。并且,基于webSocket的长连接,浏览器端也可以及时感知服务端推送的消息。比如像商家版的饿了么,当有订单时,商家接单机器上会播报语音,这种场景如果不使用长连接,就需要前端做个定时任务访问后端,看有没有订单。性能消耗非常大并且也不及时。这个时候使用websocket长连接就比较合适了。

​ 示例参见com.roy.netty.websocket包。里面给出了一个简单的websocket示例。这个示例中有一个问题就是前端如果多次上线,就会注册多个channel,而从前端只能关闭最后一个。

七、Netty群聊

​ 有了前面这些示例,再来用Netty重新开发一下之前NIO阶段的群聊项目,就比较简单了。 核心就是在服务端维护一个客户端的channel集合。

八、Netty实现RPC服务调用

​ 示例代码com.roy.netty.rpc包中提供了一个模拟Dubbo的方式实现远程调用的Demo。整体的处理流程如下图:

在这里插入图片描述

​ 这是网上有的Demo,参考一下就行。其实整个过程中麻烦点的也就是客户端的接口代理处理,以及如何将Netty的异步请求转换成同步请求。

九、短连接与长连接

​ 最后com.roy.netty.swroddemo包中整理了一个企业级的Netty封装应用。示例中使用短连接的方式提高服务的吞吐量。只抽取了部分核心代码,作为参考把。

以上是关于Netty实战的主要内容,如果未能解决你的问题,请参考以下文章

Netty4.x实战专题案例文章列表

「Netty实战 03」大白话 Netty 核心组件分析

Netty实战:netty 堆外内存泄露排查盛宴

Netty实战三之Netty的组件和设计

好书下载:《Netty实战》

Netty网络编程实战3,使用Netty远程传输文件