分布式理论,架构设计 Netty
Posted 拐柒
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了分布式理论,架构设计 Netty相关的知识,希望对你有一定的参考价值。
分布式理论,架构设计(二) Netty
Netty学习
原生NIO存在的问题
- NIO 的类库和 API 繁杂,使用麻烦:
需要熟练掌握 Selector、ServerSocketChannel、SocketChannel、ByteBuffer等。 - 需要具备其他的额外技能:
要熟悉 Java 多线程编程,因为 NIO 编程涉及到 Reactor 模式,必须对多线程和网络编程非常熟悉,才能编写出高质量的 NIO 程序。 - 开发工作量和难度都非常大:
例如客户端面临断连重连、网络闪断、半包读写、失败缓存、网络拥 塞和异常流的处理等等。 - JDK NIO 的 Bug:
臭名昭著的 Epoll Bug,它会导致 Selector 空轮询,最终导致 CPU 100%。直到JDK 1.7版本该问题仍旧存在,没有被根本解决
在NIO中通过Selector的轮询当前是否有IO事件,根据JDK NIO api描述,Selector的select方法会一直阻塞,直到IO事件达到或超时,但是在Linux平台上这里有时会出现问题,在某些场景下select方法会直接返回,即使没有超时并且也没有IO事件到达,这就是著名的epoll
bug,这是一个比较严重的bug,它会导致线程陷入死循环,会让CPU飙到100%,极大地影 响系统的可靠性,到目前为止,JDK都没有完全解决这个问题。
netty
Netty 是由 JBOSS 提供的一个 Java 开源框架。Netty 提供异步的、基于事件驱动的网络应用程序框架,用以快速开发高性能、高可靠性的网络 IO 程序。 Netty 是一个基于 NIO 的网络编程框架,使用Netty 可以帮助你快速、简单的开发出一 个网络应用,相当于简化和流程化了 NIO 的开发过程。 作为当前最流行的 NIO 框架,Netty 在互联网领域、大数据分布式计算领域、游戏行业、 通信行业等获得了广泛的应用,知名的 Elasticsearch 、Dubbo 框架内部都采用了 Netty。
优点:
1.设计优雅,提供阻塞和非阻塞的 Socket;提供灵活可拓展的事件模型;提供高度可定制的线程模型。
2.具备更高的性能和更大的吞吐量,使用零拷贝技术最小化不必要的内存复制,减少资源的消耗。
3.提供安全传输特性。
4.支持多种主流协议;预置多种编解码功能,支持用户开发私有协议。
线程模型
传统阻塞I/O服务模型
采用阻塞 IO 模式获取输入的数据, 每个连接都需要独立的线程完成数据的输入 , 业务处理和数据返回工作.
缺点:
1.当并发数很大,就会创建大量的线程,占用很大系统资源
2.连接创建后,如果当前线程暂时没有数据可读,该线程会阻塞在 read 操作,造成线程资源浪费
Reactor 模型
Reactor 模式,通过一个或多个输入同时传递给服务处理器的模式 , 服务器端程序处理传入的多个 请求,并将它们同步分派到相应的处理线程, 因此 Reactor 模式也叫 Dispatcher模式. Reactor 模式使用IO 复用监听事件, 收到事件后,分发给某个线程(进程), 这点就是网络服务器高并发处理关键.
单Reactor 单线程
优点:
模型简单,没有多线程、进程通信、竞争问题,全部都在一个线程中完成。
缺点:
1、性能问题:只有一个线程,无法完全发挥多核CPI的性能,handler在处理某个连接上的业务是,整个进程无法处理 其他连接时间,很容易导致性能瓶颈
2、可靠性问题:线程以外终止或者进入死循环,会导致整个系统通信模块不可用,不能接受和处理外部消息,造成节点故障。
单Reactor 多线程
优点:
可以充分的利用多核CPI的处理能力
缺点:
多线程数据共享和访问比较复杂, reactor 处理所有的事件的监听和响应,在单线程运行, 在高并发场景容易出现性能瓶颈
主从Reactor 多线程
优点:
1.MainReactor 线程与 SubReactor 线程的数据交互简单职责明确,MainReactor 线程只需要接收新连接,SubReactor 线程完成后续的业务处理
2.MainReactor 线程与 SubReactor 线程的数据交互简单, MainReactor 线程只需要把新连接传给 SubReactor 线程,SubReactor 线程无需返回数据
3.多个 SubReactor 线程能够应对更高的并发请求
缺点:
这种模式的缺点是编程复杂度较高。但是由于其优点明显,在许多项目中被广泛使用,包括nginx、Memcached、Netty 等。这种模式也被叫做服务器的 1+M+N 线程模式,即使用该模式开发的服务器包含一个(或多个,1 只是表示相对较少)连接建立线程+M 个 IO 线程+N 个业务处理线程。这是业界成熟的服务器程序设计模式。
netty
Netty 是由 JBOSS 提供的一个 Java 开源框架。Netty 提供异步的、基于事件驱动的网络应用程序框架,用以快速开发高性能、高可靠性的网络 IO 程序。 Netty 是一个基于 NIO 的网络编程框架,使用Netty 可以帮助你快速、简单的开发出一 个网络应用,相当于简化和流程化了 NIO 的开发过程。 作为当前最流行的 NIO 框架,Netty 在互联网领域、大数据分布式计算领域、游戏行业、 通信行业等获得了广泛的应用,知名的 Elasticsearch 、Dubbo 框架内部都采用了 Netty。
优点:
1.设计优雅,提供阻塞和非阻塞的 Socket;提供灵活可拓展的事件模型;提供高度可定制的线程模型。
2.具备更高的性能和更大的吞吐量,使用零拷贝技术最小化不必要的内存复制,减少资源的消耗。
3.提供安全传输特性。
4.支持多种主流协议;预置多种编解码功能,支持用户开发私有协议。
netty线程模型
- 线程模型基本介绍
- 传统阻塞 I/O 服务模型
- Reactor 模型
- 单 Reactor 单线程
- 单 Reactor 多线程
- 主从 Reactor 多线程
- 传统阻塞 I/O 服务模型
采用阻塞 IO 模式获取输入的数据, 每个连接都需要独立的线程完成数据的输入 , 业务处理和数据返回工作.
缺点:
1.当并发数很大,就会创建大量的线程,占用很大系统资源
2.连接创建后,如果当前线程暂时没有数据可读,该线程会阻塞在 read 操作,造成线程资源浪费 - Reactor 模型
Reactor 模式,通过一个或多个输入同时传递给服务处理器的模式 , 服务器端程序处理传入的多个 请求,并将它们同步分派到相应的处理线程, 因此 Reactor 模式也叫 Dispatcher模式. Reactor 模式使用IO 复用监听事件, 收到事件后,分发给某个线程(进程), 这点就是网络服务器高并发处理关键.
优点:
模型简单,没有多线程、进程通信、竞争的问题,全部都在一个线程中完成
缺点:
1.性能问题: 只有一个线程,无法完全发挥多核 CPU 的性能。Handler 在处理某个连接上的业务时, 整个进程无法处理其他连接事件,很容易导致性能瓶颈
2.可靠性问题: 线程意外终止或者进入死循环,会导致整个系统通信模块不可用,不能接收和处理外部消息,造成节点故障 - 单 Reactor多线程
Reactor 对象通过 selector 监控客户端请求事件, 收到事件后,通过 dispatch 进行分发
如果建立连接请求, 则右 Acceptor 通过accept 处理连接请求
如果不是连接请求,则由 reactor 分发调用连接对应的 handler 来处理
handler 只负责响应事件,不做具体的业务处理, 通过 read 读取数据后,会分发给后面的
worker 线程池的某个线程处理业务
worker 线程池会分配独立线程完成真正的业务,并将结果返回给 handler handler 收到响应后,通过 send 将结果返回给 client
优点:
可以充分的利用多核 cpu 的处理能力
缺点:
多线程数据共享和访问比较复杂, reactor 处理所有的事件的监听和响应,在单线程运行, 在高并发场景容易出现性能瓶颈 - 主从 Reactor 多线程
Reactor 主线程 MainReactor 对象通过 select 监听客户端连接事件,收到事件后,通过Acceptor 处理客户端连接事件
当 Acceptor 处理完客户端连接事件之后(与客户端建立好 Socket 连接),MainReactor 将连接分配给 SubReactor。(即:MainReactor 只负责监听客户端连接请求,和客户端建立连接之后将连接交由 SubReactor 监听后面的 IO 事件。)
SubReactor 将连接加入到自己的连接队列进行监听,并创建 Handler 对各种事件进行处理当连接上有新事件发生的时候,SubReactor 就会调用对应的 Handler 处理
Handler 通过 read 从连接上读取请求数据,将请求数据分发给 Worker 线程池进行业务处理
Worker 线程池会分配独立线程来完成真正的业务处理,并将处理结果返回给 Handler。Handler 通过 send 向客户端发送响应数据
一个 MainReactor 可以对应多个 SubReactor,即一个MainReactor 线程可以对应多个SubReactor 线程
优点:
1.MainReactor 线程与 SubReactor 线程的数据交互简单职责明确,MainReactor 线程只需要接收新连接,SubReactor 线程完成后续的业务处理
2.MainReactor 线程与 SubReactor 线程的数据交互简单, MainReactor 线程只需要把新连接传给 SubReactor 线程,SubReactor 线程无需返回数据
3.多个 SubReactor 线程能够应对更高的并发请求
缺点:
这种模式的缺点是编程复杂度较高。但是由于其优点明显,在许多项目中被广泛使用,包括Nginx、Memcached、Netty 等。这种模式也被叫做服务器的 1+M+N 线程模式,即使用该模式开发的服务器包含一个(或多个,1 只是表示相对较少)连接建立线程+M 个 IO 线程+N 个业务处理线程。这是业界成熟的服务器程序设计模式。
Netty线程模型
Netty 的设计主要基于主从 Reactor 多线程模式,并做了一定的改进。
Netty 抽象出两组线程池:BossGroup 和 WorkerGroup,也可以叫做BossNioEventLoopGroup 和 WorkerNioEventLoopGroup。每个线程池中都有NioEventLoop 线程。BossGroup 中的线程专门负责和客户端建立连接,WorkerGroup 中的线程专门负责处理连接上的读写。BossGroup 和 WorkerGroup 的类型都NioEventLoopGroup
NioEventLoopGroup 相当于一个事件循环组,这个组中含有多个事件循环,每个事件循环就是一个 NioEventLoop。
NioEventLoop 表示一个不断循环的执行事件处理的线程,每个NioEventLoop 都包含一个
Selector,用于监听注册在其上的 Socket 网络连接(Channel)。
NioEventLoopGroup 可以含有多个线程,即可以含有多个 NioEventLoop
每个 BossNioEventLoop 中循环执行以下三个步骤:
select:轮训注册在其上的 ServerSocketChannel 的 accept 事件(OP_ACCEPT 事件)
processSelectedKeys:处理 accept 事件,与客户端建立连接,生成一个NiosocketChannel,并将其注册到某个WorkerNioEventLoop 上的 Selector 上
runAllTasks:再去以此循环处理任务队列中的其他任务
每个 WorkerNioEventLoop 中循环执行以下三个步骤:
select:轮训注册在其上的 NioSocketChannel 的 read/write 事件(OP_READ/OP_WRITE 事件)
processSelectedKeys:在对应的 NioSocketChannel 上处理 read/write 事件
runAllTasks:再去以此循环处理任务队列中的其他任务
在以上两个processSelectedKeys步骤中,会使用 Pipeline(管道),Pipeline 中引用了
Channel,即通过 Pipeline 可以获取到对应的 Channel,Pipeline 中维护了很多的处理器(拦截处理器、过滤处理器、自定义处理器等)。
核心API
ChannelHandler及其实现类
Netty开发中需要自定义一个 Handler 类去实现 ChannelHandle接口或其子接口或其实现类,然后通过重写相应方法实现业务逻辑。
- public void channelActive(ChannelHandlerContext ctx),通道就绪事件
- public void channelRead(ChannelHandlerContext ctx, Object msg),通道读取数据事件
- public void channelReadComplete(ChannelHandlerContext ctx) ,数据读取完毕事件
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause),通道发生异常事件
ChannelPipeline
如果客户端和服务器的Handler是一样的,消息从客户端到服务端或者反过来,每个Inbound类型或Outbound类型的Handler只会经过一次,混合类型的Handler(实现了Inbound和Outbound的Handler)会经过两次。准确的说ChannelPipeline中是一个ChannelHandlerContext,每个上下文对象中有ChannelHandler. InboundHandler是按照Pipleline的加载顺序的顺序执行, OutboundHandler 是按照Pipeline的加载顺序,逆序执行
ChannelHandlerContext
这 是 事 件 处 理 器 上 下 文 对 象 , Pipeline 链 中 的 实 际 处 理 节 点 。 每 个 处 理 节 点
ChannelHandlerContext 中 包 含 一 个 具 体 的 事 件 处 理 器 ChannelHandler ,同时ChannelHandlerContext 中也绑定了对应的 ChannelPipeline和 Channel 的信息,方便对ChannelHandler 进行调用。
- ChannelFuture close(),关闭通道
- ChannelOutboundInvoker flush(),刷新
- ChannelFuture writeAndFlush(Object msg) ,将数据写到 ChannelPipeline 中当前ChannelHandler 的下一个 ChannelHandler 开始处理(出站)
ChannelOption
Netty 在创建 Channel 实例后,一般都需要设置 ChannelOption 参数。ChannelOption 是 Socket 的标准参数,而非 Netty 独创的。
- ChannelOption.SO_BACKLOG
对应 TCP/IP 协议 listen 函数中的 backlog 参数,用来初始化服务器可连接队列大小。服务端处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接。多个客户 端来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理,backlog 参数指定 了队列的大小。 - ChannelOption.SO_KEEPALIVE
一直保持连接活动状态。该参数用于设置TCP连接,当设置该选项以后,连接会测试链接的状态,这个选项用于可能长时间没有数据交流的连接。当设置该选项以 后,如果在两小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文。
ChannelFuture
表示 Channel 中异步 I/O 操作的结果,在 Netty 中所有的 I/O 操作都是异步的,I/O 的调用会直接返回,调用者并不能立刻获得结果,但是可以通过 ChannelFuture 来获取 I/O 操作 的处理状态
- Channel channel(),返回当前正在进行 IO 操作的通道
- ChannelFuture sync(),等待异步操作执行完毕,将异步改为同步
EventLoopGroup和实现类NioEventLoopGroup
EventLoopGroup 是一组 EventLoop 的抽象,Netty 为了更好的利用多核 CPU 资源,一般 会有多个
EventLoop 同时工作,每个 EventLoop 维护着一个 Selector 实例。
EventLoopGroup 提供 next 接口,可以从组里面按照一定规则获取其中一个 EventLoop 来处理任务。在 Netty 服务器端编程中,我们一般都需要提供两个 EventLoopGroup,例如:
BossEventLoopGroup 和 WorkerEventLoopGroup。 通常一个服务端口即一个 ServerSocketChannel对应一个Selector 和一个EventLoop线程。 BossEventLoop 负责接收客户端的连接并将SocketChannel 交给 WorkerEventLoopGroup 来进 行 IO 处理,
BossEventLoopGroup 通常是一个单线程的 EventLoop,EventLoop 维护着一个注册了。
ServerSocketChannel 的 Selector 实例,BossEventLoop 不断轮询 Selector 将连接事件分离出来, 通常是 OP_ACCEPT 事件,然后将接收到的 SocketChannel 交给 WorkerEventLoopGroup, WorkerEventLoopGroup 会由 next 选择其中一个 EventLoopGroup 来将这个 SocketChannel 注册到其维护的 Selector 并对其后续的 IO 事件进行处理。
ServerBootstrap和Bootstrap
ServerBootstrap 是 Netty 中的服务器端启动助手,通过它可以完成服务器端的各种配置;Bootstrap 是 Netty 中的客户端启动助手,通过它可以完成客户端的各种配置。
- public ServerBootstrap group(EventLoopGroup parentGroup,EventLoopGroup childGroup),该方法用于服务器端,用来设置两个 EventLoop
- public B group(EventLoopGroup group) ,该方法用于客户端,用来设置一个 EventLoop
- public B channel(Class<? extends C> channelClass),该方法用来设置一个服务器端的通道实现
- public B option(ChannelOption option, T value),用来给 ServerChannel 添加配置
- public ServerBootstrap childOption(ChannelOption childOption, T value),用来给接收到的通道添加配置
- public ServerBootstrap childHandler(ChannelHandler childHandler),该方法用来设置业务 处理类(自定义的 handler)
- public ChannelFuture bind(int inetPort) ,该方法用于服务器端,用来设置占用的端口号
- public ChannelFuture connect(String inetHost, int inetPort) ,该方法用于客户端,用来连 接服务器端
Unpooled类
这是 Netty 提供的一个专门用来操作缓冲区的工具类。常用方法public static ByteBuf copiedBuffer(CharSequence string, Charset charset),通过给定的数据 和字符编码返回一个 ByteBuf 对象(类似于 NIO 中的 ByteBuffer 对象)
netty入门案例
pom.xml
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.42.Final</version>
</dependency>
服务端:
public static void main(String[] args) throws InterruptedException {
// 1.创建bossGroup线程组: 处理网络事件--连接事件
EventLoopGroup boosGroup = new NioEventLoopGroup(1);
// 2.创建workerGroup线程组: 处理网络事件--读写事件 2*处理器线程数
EventLoopGroup workGroup = new NioEventLoopGroup();
// 3.创建服务端启动助手
ServerBootstrap serverBootstrap= new ServerBootstrap();
// 4.设置bossGroup线程组和workerGroup线程组
serverBootstrap.group(boosGroup,workGroup)
.channel(NioServerSocketChannel.class)// 5.设置服务端通道实现为NIO
.option(ChannelOption.SO_BACKLOG,128)// 6.参数设置
.childOption(ChannelOption.SO_KEEPALIVE,Boolean.TRUE)// 6.参数设置
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
//TODO 8.向pipeline中添加自定义业务处理handler
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast(new NettyServerHandler());
}
});// 7.创建一个通道初始化对象
// 9.启动服务端并绑定端口,同时将异步改为同步
ChannelFuture future = serverBootstrap.bind(7777);
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
if(channelFuture.isSuccess()){
System.out.println("端口绑定成功");
}else {
System.out.println("端口绑定失败");
}
}
});
System.out.println("服务端启动完成");
// 10.关闭通道(并不是真正意义上的关闭,而是监听通道关闭的状态)和关闭连接池
future.channel().closeFuture().sync();
boosGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
客户端
public static void main(String[] args) throws InterruptedException {
// 1.创建线程组
EventLoopGroup group = new NioEventLoopGroup();
// 2.创建客户端启动助手
Bootstrap bootstrap=new Bootstrap();
// 3.设置线程组
bootstrap.group(group)
.channel(NioSocketChannel.class)// 4.设置客户端通道实现为NIO
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//TODO 6.向pipeline中添加自定义业务处理handler
socketChannel.pipeline().addLast(new NettyClientHandler());
}
});// 5.创建一个通道初始化对象
// 7.启动客户端,等待连接服务端,同时将异步改为同步
ChannelFuture channelFuture = bootstrap.connect("localhost", 7777).sync();
// 8.关闭通道(并不是真正意义上的关闭,而是监听通道关闭的状态)和关闭连接池
channelFuture.channel().closeFuture().sync();
group.shutdownGracefully();
}
自定义服务端handle
实现ChannelInboundHandler 接口,重写channelRead和channelReadComplete方法
public void channelRead(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {
ByteBuf byteBuf= (ByteBuf) o;
System.out.println("客户端发送过来的消息:"+ byteBuf.toString(CharsetUtil.UTF_8));
}
public void channelReadComplete(ChannelHandlerContext channelHandlerContext) throws Exception {
System.out.println("通道读取完毕事件,可以读取信息");
channelHandlerContext.writeAndFlush(Unpooled.copiedBuffer("我是netty服务端",StandardCharsets.UTF_8));
}
自定义客户端handle
实现ChannelInboundHandler接口,重写channelActive和channelRead方法
public void channelRead(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {
ByteBuf byteBuf = (ByteBuf) o;
System.out.println("服务端返回的消息:"+byteBuf.toString(StandardCharsets.UTF_8));
}
public void channelActive(ChannelHandlerContext channelHandlerContext) throws Exception {
System.out.println("通道就绪,可以向服务端发送消息");
ChannelFuture channelFuture = channelHandlerContext.writeAndFlush(Unpooled.copiedBuffer("我是netty客户端", StandardCharsets.UTF_8));
channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
if(channelFuture.isSuccess()){
System.out.println("数据发送成功");
}else {
System.out.println("数据发送失败");
}
}
});
}
netty编解码器
在网络应用中需要实现某种编解码器,将原始字节数据与自定义的消息对象进行互相转换。网络 中都是以字节码的数据形式来传输数据的,服务器编码数据后发送到客户端,客户端需要对数据进行解码。
Netty 的编(解)码器实现了 ChannelHandlerAdapter,也是一种特殊的 ChannelHandler,所以依赖于 ChannelPipeline,可以将多个编(解)码器链接在一起,以实现复杂的转换逻辑。
Netty里面的编解码: 解码器:负责处理“入站 InboundHandler”数据。 编码器:负责“出站OutboundHandler” 数据。
netty解码器
解码器负责 解码“入站”数据从一种格式到另一种格式,解码器处理入站数据是抽象ChannelInboundHandler的实现。需要将解码器放在ChannelPipeline中。对于解码器,Netty中主要提供了抽象基类ByteToMessageDecoder和MessageToMessageDecoder。
public class MessageDecode extends MessageToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, Object o, List list) throws Exception {
System.out.println("正在进行消息解码");
ByteBuf buf= (ByteBuf) o;
list.add(buf.toString(CharsetUtil.UTF_8));//传递到下一个handler
}
}
在启动类中添加
pipeline.addLast("messageDecoder",new MessageDecode());
netty编码器
与ByteToMessageDecoder和MessageToMessageDecoder相对应,Netty提供了对应的编码器 实现MessageToByteEncoder和MessageToMessageEncoder,二者都实现ChannelOutboundHandler接口
public class MessageEnCode extends MessageToMessageEncoder {
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, Object o, List list) throws Exception {
System.out.println("消息正在进行编码");
String str= (String) o;
list.add(Unpooled.copiedBuffer(str, CharsetUtil.UTF_8));
}
}
在启动类中添加
socketChannel.pipeline().addLast("messageEncoder",new MessageEnCode());
netty编解码器
编码解码器: 同时具有编码与解码功能,特点同时实现了ChannelInboundHandler和ChannelOutboundHandler接口,因此在数据输入和输出时都能进行处理
public class MessageCodec extends MessageToMessageCodec {
/**
* 编码
* @param channelHandlerContext
* @param o
* @param list
* @throws Exception
*/
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, Object o, List list) throws Exception {
System.out.println("消息正在进行编码");
String str= (String) o;
list.add(Unpooled.copiedBuffer(str, CharsetUtil.UTF_8));
}
/**
* 解码
* @param channelHandlerContext
* @param o
* @param list
* @throws Exception
*/
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, Object o, List list) throws Exception {
System.out.println("正在进行消息解码");
ByteBuf buf= (ByteBuf) o;
list.add(buf.toString(CharsetUtil.UTF_8));//传递到下一个handler
}
}
在启动类中添加
pipeline.addLast(new MessageCodec());
netty实现聊天室
需求:
1.编写一个 Netty 群聊系统,实现服务器端和客户端之间的数据简单通讯
2.实现多人群聊
3.服务器端:可以监测用户上线,离线,并实现消息转发功能
4.客户端:可以发送消息给其它所有用户,同时可以接受其它用户发送的消息
服务端代码
public void run() throws InterruptedException {
EventLoopGroup boosGroup = null;
EventLoopGroup workGroup = null;
try {
// 1.创建bossGroup线程组: 处理网络事件--连接事件
boosGroup = new NioEventLoopGroup(1);
// 2.创建workerGroup线程组: 处理网络事件--读写事件 2*处理器线程数
workGroup = new NioEventLoopGroup();
// 3.创建服务端启动助手
ServerBootstrap serverBootstrap= new ServerBootstrap();
// 4.设置bossGroup线程组和workerGroup线程组
serverBootstrap.group(boosGroup,workGroup)
.channel(NioServerSocketChannel.class)// 5.设置服务端通道实现为NIO
.option(ChannelOption.SO_BACKLOG,128)// 6.参数设置
.childOption(ChannelOption.SO_KEEPALIVE,Boolean.TRUE)// 6.参数设置
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
//添加解码器
// pipeline.addLast("messageDecoder",new MessageDecode());
// pipeline.addLast("messageEncoder",new MessageEnCode());
//添加编解码器
// pipeline.addLast(new MessageCodec());
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
//TODO 8.向pipeline中添加自定义业务处理handler
pipeline.addLast(new NettyServerHandler());
}
});// 7.创建一个通道初始化对象
// 9.启动服务端并绑定端口,同时将异步改为同步
ChannelFuture future = serverBootstrap.bind(port);
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
if(channelFuture.isSuccess()){
System.out.println("端口绑定成功");
}else {
System.out.println("端口绑定失败");
}
}
});
System.out.println("服务端启动完成");
// 10.关闭通道(并不是真正意义上的关闭,而是监听通道关闭的状态)和关闭连接池
future.channel().closeFuture().sync();
} finally {
boosGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
服务端handle
public static List<Channel> list=new ArrayList<>();
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
Channel channel = channelHandlerContext.channel();
for (Channel channel1 : list) {
if(channel!=channel1){
channel1.writeAndFlush("["+channel.remoteAddress().toString().substring(1)+"]:说"+s);
}
}
}
/**
* 客户端连接时
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
list.add(channel);
System.out.println("[server]:"+channel.remoteAddress().toString().substring(1)+"上线了");
}
/**
* 通道未就绪,下线了
* @param ctx
* @throws Exception
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
//客户端断开连接,就移除对应通道
list.remove(channel);
System.out.println(channel.remoteAddress().toString().substring(1)+"下线了");
}
/**
* 异常处理时间
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
Channel channel = ctx.channel();
System.out.println(channel.remoteAddress().toString().substring(1)+"出现异常");
}
客户端代码
NettyChatClient
private String ip;
private int port;
public NettyChatClient(String ip, int port) {
this.ip = ip;
this.port = port;
}
public void run() throws InterruptedException {
EventLoopGroup group=null;
try {
// 1.创建线程组
group= new NioEventLoopGroup();
// 2.创建客户端启动助手
Bootstrap bootstrap=new Bootstrap();
// 3.设置线程组
bootstrap.group(group)
.channel(NioSocketChannel.class)// 4.设置客户端通道实现为NIO
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
// socketChannel.pipeline().addLast("messageDecoder",new MessageDecode());
// socketChannel.pipeline().addLast("messageEncoder",new MessageEnCode());
socketChannel.pipeline().addLast(new MessageCodec());
//TODO 6.向pipeline中添加自定义业务处理handler
// socketChannel.pipeline().addLast(new NettyClientHandler());
socketChannel.pipeline().addLast(new NettyChatClientHandler());
}
});// 5.创建一个通道初始化对象
// 7.启动客户端,等待连接服务端,同时将异步改为同步
ChannelFuture channelFuture = bootstrap.connect(ip, port).sync();
Channel channel = channelFuture.channel();
System.out.println("客户端"+channel.localAddress().toString().substring(1)+"");
Scanner sc=new Scanner(System.in);
while (sc.hasNextLine()){
String s = sc.nextLine();
//向服务端发送消息
channel.writeAndFlush(s);
}
// 8.关闭通道(并不是真正意义上的关闭,而是监听通道关闭的状态)和关闭连接池
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
group.shutdownGracefully();
}
}
public static void main(String[] args) throws InterruptedException {
new NettyChatClient("localhost", 7777).run();
}
NettyChatClientHandler
/**
* 通道读取就绪事件
* @param channelHandlerContext
* @param s
* @throws Exception
*/
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
System.out.println("服务端发送消息"+s);
}
以上是关于分布式理论,架构设计 Netty的主要内容,如果未能解决你的问题,请参考以下文章