Netty搭建TCP服务实践
Posted 架构空间
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Netty搭建TCP服务实践相关的知识,希望对你有一定的参考价值。
在Netty基本组件介绍中,我们大致了解了Netty的一些基本组件,今天我们来搭建一个基于Netty的TCP服务端程序,通过代码来了解和熟悉这些组件的功能和使用方法。
首先我们自己创建一个Server类,命名为TCPServer
第一步,初始化ServerBootstrap,ServerBootstrap是netty中的一个服务器引导类,对ServerBootstrap的实例化就是创建netty服务器的入口
1public class TCPServer {
2 private Logger log = LoggerFactory.getLogger(getClass());
3 //端口号
4 private int port=5080;
5 //服务器运行状态
6 private volatile boolean isRunning = false;
7 //处理Accept连接事件的线程,这里线程数设置为1即可,netty处理链接事件默认为单线程,过度设置反而浪费cpu资源
8 private final EventLoopGroup bossGroup = new NioEventLoopGroup(1);
9 //处理hadnler的工作线程,其实也就是处理IO读写 。线程数据默认为 CPU 核心数乘以2
10 private final EventLoopGroup workerGroup = new NioEventLoopGroup();
11
12 public void init() throws Exception{
13 //创建ServerBootstrap实例
14 ServerBootstrap serverBootstrap=new ServerBootstrap();
15 //初始化ServerBootstrap的线程组
16 serverBootstrap.group(workerGroup,workerGroup);//
17 //设置将要被实例化的ServerChannel类
18 serverBootstrap.channel(NioserverSocketChannel.class);//
19 //在ServerChannelInitializer中初始化ChannelPipeline责任链,并添加到serverBootstrap中
20 serverBootstrap.childHandler(new ServerChannelInitializer());
21 //标识当服务器请求处理线程全满时,用于临时存放已完成三次握手的请求的队列的最大长度
22 serverBootstrap.option(ChannelOption.SO_BACKLOG, 1024);
23 // 是否启用心跳保活机机制
24 serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
25 //绑定端口后,开启监听
26 ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
27 if(channelFuture.isSuccess()){
28 System.out.println("TCP服务启动 成功---------------");
29 }
30 }
31
32 /**
33 * 服务启动
34 */
35 public synchronized void startServer() {
36 try {
37 this.init();
38 }catch(Exception ex) {
39
40 }
41 }
42
43 /**
44 * 服务关闭
45 */
46 public synchronized void stopServer() {
47 if (!this.isRunning) {
48 throw new IllegalStateException(this.getName() + " 未启动 .");
49 }
50 this.isRunning = false;
51 try {
52 Future<?> future = this.workerGroup.shutdownGracefully().await();
53 if (!future.isSuccess()) {
54 log.error("workerGroup 无法正常停止:{}", future.cause());
55 }
56
57 future = this.bossGroup.shutdownGracefully().await();
58 if (!future.isSuccess()) {
59 log.error("bossGroup 无法正常停止:{}", future.cause());
60 }
61 } catch (InterruptedException e) {
62 e.printStackTrace();
63 }
64 this.log.info("TCP服务已经停止...");
65 }
66
67 private String getName() {
68 return "TCP-Server";
69 }
70}
上面的代码中主要使用到的ServerBootstrap类的方法有以下这些:
group :设置SeverBootstrap要用到的EventLoopGroup,也就是定义netty服务的线程模型,处理Acceptor链接的主"线程池"以及用于I/O工作的从"线程池";
channel:设置将要被实例化的SeverChannel类;
option :指定要应用到新创建SeverChannel的ChannelConfig的ChannelOption.其实也就是服务本身channel的一些配置;
chidOption:子channel的ChannelConfig的ChannelOption。也就是与客户端建立的连接channel的一些配置;
childHandler:设置将被添加到已被接收的子Channel的ChannelPipeline中的ChannelHandler,其实就是让你在里面定义处理连接收发数据,需要哪些ChannelHandler按什么顺序去处理;
第二步,我们实现ServerChannelInitializer类,这个类继承实现自netty的ChannelInitializer抽象类,这个类的作用就是对channel(连接)的ChannelPipeline进行初始化工作,说白了就是你要把处理数据的方法添加到这个任务链中去,netty才知道每一步拿着socket连接和数据去做什么。
1@ChannelHandler.Sharable
2public class ServerChannelInitializer extends ChannelInitializer<SocketChannel> {
3 static final EventExecutorGroup group = new DefaultEventExecutorGroup(2);
4
5 public ServerChannelInitializer() throws InterruptedException {
6 }
7
8 @Override
9 protected void initChannel(SocketChannel socketChannel) throws Exception {
10 ChannelPipeline pipeline = socketChannel.pipeline();
11 //IdleStateHandler心跳机制,如果超时触发Handle中userEventTrigger()方法
12 pipeline.addLast("idleStateHandler",
13 new IdleStateHandler(15, 0, 0, TimeUnit.MINUTES));
14 // netty基于分割符的自带解码器,根据提供的分隔符解析报文,这里是0x7e;1024表示单条消息的最大长度,解码器在查找分隔符的时候,达到该长度还没找到的话会抛异常
15// pipeline.addLast(
16// new DelimiterBasedFrameDecoder(1024, Unpooled.copiedBuffer(new byte[] { 0x7e }),
17// Unpooled.copiedBuffer(new byte[] { 0x7e })));
18 //自定义编解码器
19 pipeline.addLast(
20 new MessagePacketDecoder(),
21 new MessagePacketEncoder()
22 );
23 //自定义Hadler
24 pipeline.addLast("handler",new TCPServerHandler());
25 //自定义Hander,可用于处理耗时操作,不阻塞IO处理线程
26 pipeline.addLast(group,"BussinessHandler",new BussinessHandler());
27 }
28}
这里我们注意下
1pipeline.addLast(group,"BussinessHandler",new BussinessHandler());
在这里我们可以把一些比较耗时的操作(如存储、入库)等操作放在BussinessHandler中进行,因为我们为它单独分配了EventExecutorGroup 线程池执行,所以说即使这里发生阻塞,也不会影响TCPServerHandler中数据的接收。
最后就是各个部分的具体实现
解码器的实现:
1public class MessagePacketDecoder extends ByteToMessageDecoder
2{
3
4 public MessagePacketDecoder() throws Exception
5 {
6 }
7
8 @Override
9 protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception
10 {
11 try {
12 if (buffer.readableBytes() > 0) {
13 // 待处理的消息包
14 byte[] bytesReady = new byte[buffer.readableBytes()];
15 buffer.readBytes(bytesReady);
16 //这之间可以进行报文的解析处理
17 out.add(bytesReady );
18 }
19 }finally {
20
21 }
22 }
23
24
25}
编码器的实现
1public class MessagePacketEncoder extends MessageToByteEncoder<Object>
2{
3 public MessagePacketEncoder()
4 {
5 }
6
7 @Override
8 protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception
9 {
10 try {
11 //在这之前可以实现编码工作。
12 out.writeBytes((byte[])msg);
13 }finally {
14
15 }
16 }
17}
18TCPServerHandler的实现
19
20public class TCPServerHandler extends ChannelInboundHandlerAdapter {
21 public TCPServerHandler() {
22 }
23
24 @Override
25 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
26 //拿到传过来的msg数据,开始处理
27 }
28
29 //检测到空闲连接,触发
30 @Override
31 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
32 //这里可做一些断开连接的处理
33 }
34}
BussinessHandler的实现与TCPServerHandler基本类似,它可以处理一些相对比较耗时的操作,我们这里就不实现了。
通过以上的代码我们可以看到,一个基于netty的TCP服务的搭建基本就是三大块:
1、对引导服务器类ServerBootstrap的初始化;
2、对ChannelPipeline的定义,也就是把多个ChannelHandler组成一条任务链;
3、对 ChannelHandler的具体实现,其中可以有编解码器,可以有对收发数据的业务处理逻辑;
以上代码只是在基于Netty框架搭建一个最基本的TCP服务,其中包含了一些Netty基本的特性和功能,当然这只是Netty运用的一个简单的介绍,如有不正确的地方还望指出与海涵。
以上是关于Netty搭建TCP服务实践的主要内容,如果未能解决你的问题,请参考以下文章
java 使用netty搭建tcp服务器(hello world)
netty系列之:在netty中使用UDP协议请求DNS服务器