网络I/o编程模型12 netty实现tcp服务通讯(含EventLoop实现调度)
Posted 健康平安的活着
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了网络I/o编程模型12 netty实现tcp服务通讯(含EventLoop实现调度)相关的知识,希望对你有一定的参考价值。
一 netyy实现tcp通讯
1.1 案例需求描述
1.netty服务端可以监听6666端口,客户端向此服务器进行发送信息。例如“hello,netty服务器....”;
2.服务端收到信息对客户端进行回复信息,例如“hello,客户端发的消息已经收到.....”
1.2 代码
1.2.1 客户端
1.服务端代码
package com.ljf.netty.netty.tcp;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoop;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NiosocketChannel;
/**
* @ClassName: NettyTcpClient
* @Description: TODO
* @Author: liujianfu
* @Date: 2022/05/29 15:52:40
* @Version: V1.0
**/
public class NettyTcpClient
public static void main(String[] args)
//客户端需要一个事件循环组
EventLoopGroup group=new NioEventLoopGroup();
try
//创建客户端启动对象,注意的一点是 客户端使用的不是ServerBootstrap,而是Bootstrap
Bootstrap bootstrap=new Bootstrap();
//设置相关参数
bootstrap.group(group) //设置线程组
.channel(NioSocketChannel.class) //设置客户端通道的实现类(反射)
.handler(new ChannelInitializer<SocketChannel>()
protected void initChannel(SocketChannel ch)
ch.pipeline().addLast(new NettyTcpClientHandler());//加入自己的处理器
);
//客户端主备就绪
System.out.println("客户端 is ok.....");
//启动客户端去链接服务器端,channelfuture,涉及道netty的异步模型
ChannelFuture channelFuture=bootstrap.connect("127.0.0.1",6666).sync();
//给关闭通道进行监听
channelFuture.channel().closeFuture().sync();
catch (InterruptedException e)
e.printStackTrace();
finally
group.shutdownGracefully();
2.client的自定义的handler
package com.ljf.netty.netty.tcp;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelPipeline;
import io.netty.util.CharsetUtil;
import java.nio.ByteBuffer;
/**
* @ClassName: NettyTcpClientHandler
* @Description: TODO 这是自己定义handler,客户端发送的信息
* @Author: liujianfu
* @Date: 2022/05/29 16:38:47
* @Version: V1.0
**/
public class NettyTcpClientHandler extends ChannelInboundHandlerAdapter
/**
* @author liujianfu
* @description
* channelHandlerContext ctx 上下文对象中,含有: 管道pipeline 通道channel ,地址
* Object msg: 就是客户端发送的数据,默认为Object
* 当通道就绪就会触发此方法
* @date 2022/5/29 15:34
* @param [ctx, msg]
* @return void
*/
public void channelActive(ChannelHandlerContext ctx)
System.out.println("触发 channelActive");
String sendContent=" 你好,服务端器,我是客户端252525!";
System.out.println("================客户端--->服务端的消息:【 "+ sendContent+" 】");
ctx.writeAndFlush(Unpooled.copiedBuffer(sendContent,CharsetUtil.UTF_8));
System.out.println("================客户端--->服务端消息完毕=================================");
//当通道有读取事件时,会触发
public void channelRead(ChannelHandlerContext ctx,Object msg)
ByteBuf buf=(ByteBuf)msg;
System.out.println("收到服务端的回执信息:"+buf.toString(CharsetUtil.UTF_8));
System.out.println("服务器的地址:"+ ctx.channel().remoteAddress());
//处理异常,一般需要关闭通道
public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause)
cause.printStackTrace();
ctx.close();
1.2.2 服务端
1.服务端代码
package com.ljf.netty.netty.tcp;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
* @ClassName: NettyTcpServer
* @Description: TODO
* @Author: liujianfu
* @Date: 2022/05/28 19:32:58
* @Version: V1.0
**/
public class NettyTcpServer
/**
* @author liujianfu
* @description
* 1.创建连个线程组bossGroup和workerGroup
* 2.bossGroup只是处理连接请求,真正和客户端业务处理的,会交给workerGroup完成。
* EventLoopGroup bossGroup=new NioEventLoopGroup(1);//默认为cpu核数*2;
* EventLoopGroup workGroup=new NioEventLoopGroup();
* 3.两个都是无线循环,构造函数可以设置子线程的个数。
*
*
* @date 2022/5/28 19:35
* @param [args]
* @return void
*/
public static void main(String[] args)
//创建连个线程组bossGroup和workerGroup
EventLoopGroup bossGroup=new NioEventLoopGroup(1);//默认为cpu核数*2;
EventLoopGroup workGroup=new NioEventLoopGroup();
try
//创建服务器端的启动对象,配置参数
ServerBootstrap bootstrap=new ServerBootstrap();
//使用链式编程来进行设置
bootstrap.group(bossGroup,workGroup) //设连个线程组
.channel(NioServerSocketChannel.class) //使用NioSocketChannel作为服务器的通道实现
.option(ChannelOption.SO_BACKLOG,128)//设置线程队列得到的连接个数
.childOption(ChannelOption.SO_KEEPALIVE,true) //设置保持活动连接状态
.childHandler(new ChannelInitializer<SocketChannel>() // 创建一个普通测试对象,内名内部类对象
//给pipeline 设置处理
protected void initChannel(SocketChannel ch)
ch.pipeline().addLast(new NettyTcpServerHandler());
);//给我们的workerGroup的EventLopp对应的管道设置处理器,自定义的。
System.out.println(".......netty服务器 is ready.....");
//绑定一个端口并且同步,生成了一个ChanelFuture对象
//启动服务器(并绑定端口)
ChannelFuture channelFuture=bootstrap.bind(6666).sync();
//对关闭的通道进行监听
channelFuture.channel().closeFuture().sync();
catch (InterruptedException e)
e.printStackTrace();
finally
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
2.服务端handler
package com.ljf.netty.netty.tcp;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.util.CharsetUtil;
/**
* @ClassName: NettyTcpServerHandler
* @Description: TODO 这是自己定义handler,读取客户端发送过来的数据
* @Author: liujianfu
* @Date: 2022/05/29 14:15:14
* @Version: V1.0
**/
public class NettyTcpServerHandler extends ChannelInboundHandlerAdapter
/**
* @author liujianfu
* @description
* channelHandlerContext ctx 上下文对象中,含有: 管道pipeline 通道channel ,地址
* Object msg: 就是客户端发送的数据,默认为Object
* @date 2022/5/29 15:34
* @param [ctx, msg]
* @return void
*/
public void channelRead(ChannelHandlerContext ctx,Object msg)
System.out.println("服务器读取线程:"+Thread.currentThread().getName());
System.out.println(" server ctx:"+ctx);
//获取通道信息
Channel channel= (Channel) ctx.channel();
//获取pipeline,本质上是一个双向链接,出战入战
ChannelPipeline pipeline=ctx.pipeline();
//将msg转成一个ByteBuf,ByteBUf是netty提供的,不是Nio提供的
ByteBuf buf=(ByteBuf)msg;
String content=buf.toString(CharsetUtil.UTF_8);
System.out.println("接受道客户端发来的消息:"+content);
System.out.println("客户端的地址:"+channel.remoteAddress());
//数据读取完毕
public void channelReadComplete(ChannelHandlerContext ctx)
//将数据写入道缓存,并刷新
ctx.writeAndFlush(Unpooled.copiedBuffer(" hello,客户端,发来的消息已经收到!",CharsetUtil.UTF_8));
//处理异常,一般需要关闭通道
public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause)
ctx.close();
1.2.3 调用
1.启动服务端
2.再启动客户端
1.2.4 将服务端自定义的handler设置定时任务执行
package com.ljf.netty.netty.tcp;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelPipeline;
import io.netty.util.CharsetUtil;
import java.util.concurrent.TimeUnit;
/**
* @ClassName: NettyTcpServerHandler3
* @Description: TODO
* @Author: liujianfu
* @Date: 2022/05/29 17:44:26
* @Version: V1.0
**/
public class NettyTcpServerHandler3 extends ChannelInboundHandlerAdapter
/**
* @author liujianfu
* @description
* channelHandlerContext ctx 上下文对象中,含有: 管道pipeline 通道channel ,地址
* Object msg: 就是客户端发送的数据,默认为Object
* @date 2022/5/29 15:34
* @param [ctx, msg]
* @return void
*/
public void channelRead(ChannelHandlerContext ctx, Object msg)
System.out.println("服务器读取线程:"+Thread.currentThread().getName());
System.out.println(" server ctx:"+ctx);
//获取通道信息
Channel channel= (Channel) ctx.channel();
//获取pipeline,本质上是一个双向链接,出战入战
ChannelPipeline pipeline=ctx.pipeline();
//将msg转成一个ByteBuf,ByteBUf是netty提供的,不是Nio提供的
ByteBuf buf=(ByteBuf)msg;
String content=buf.toString(CharsetUtil.UTF_8);
System.out.println("接受道客户端发来的消息:"+content);
System.out.println("客户端的地址:"+channel.remoteAddress());
//自定义定时任务,任务放到scheduledtaskqueue中,多少秒侯执行
ctx.channel().eventLoop().schedule(new Runnable()
@Override
public void run()
try
Thread.sleep(5*1000);
ctx.writeAndFlush(Unpooled.copiedBuffer(" hello,客户端,发来的消息已经收到11111!",CharsetUtil.UTF_8));
System.out.println("channel code:"+ctx.channel().hashCode());
catch (InterruptedException e)
e.printStackTrace();
System.out.println("发生异常:"+e.getStackTrace());
,5,TimeUnit.SECONDS);
ctx.channel().eventLoop().scheduleAtFixedRate(new Runnable()
@Override
public void run()
try
Thread.sleep(3*1000);
ctx.writeAndFlush(Unpooled.copiedBuffer(" hello,客户端,发来的消息已经收到2222222",CharsetUtil.UTF_8));
System.out.println("channel code:"+ctx.channel().hashCode());
catch (InterruptedException e)
e.printStackTrace();
System.out.println("发生异常:"+e.getStackTrace());
,5,10,TimeUnit.SECONDS);
//数据读取完毕
public void channelReadComplete(ChannelHandlerContext ctx)
//将数据写入道缓存,并刷新
ctx.writeAndFlush(Unpooled.copiedBuffer(" hello,客户端,发来的消息已经收到!",CharsetUtil.UTF_8));
//处理异常,一般需要关闭通道
public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause)
ctx.close();
查看结果
Netty利用EventLoop实现调度任务执行_w3cschool
开发者涨薪指南 48位大咖的思考法则、工作方式、逻辑体系以上是关于网络I/o编程模型12 netty实现tcp服务通讯(含EventLoop实现调度)的主要内容,如果未能解决你的问题,请参考以下文章