8.基于netty实现群聊,心跳检测
Posted PacosonSWJTU
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了8.基于netty实现群聊,心跳检测相关的知识,希望对你有一定的参考价值。
【README】
1.本文总结自B站《netty-尚硅谷》,很不错;
2.本文po出了 Unpooled创建缓冲区的 代码示例;
3.本文示例代码基于netty实现以下功能:
- 群聊客户端及服务器;
- 心跳检测;
【1】Unpooled创建缓冲区
Unpooled定义:
- 是Netty 提供的一个专门用来操作缓冲区(即Netty的数据容器)的工具;
【1.1】Unpooled.buffer-申请给定容量的缓冲区
1)Unpooled.buffer(capacity) 定义:
public static ByteBuf buffer(int initialCapacity)
return ALLOC.heapBuffer(initialCapacity);
代码示例 :
public class NettyByteBuf61
public static void main(String[] args)
// 创建一个对象,该对象包含一个数组 byte[10]
// 在netty buf中,不需要像nio那样 执行flip 切换读写模式
// 因为 netty buf,维护了一个 readerIndex 和 writerIndex,分别表示下一次要读入和写出的位置
ByteBuf byteBuf = Unpooled.buffer(10);
for (int i = 0; i < 10; i++)
byteBuf.writeByte(i); // writerIndex 自增
// 输出
for (int i = 0; i < byteBuf.capacity(); i++)
System.out.printf(byteBuf.readByte() + " "); // readerIndex 自增
// System.out.println(byteBuf.getByte(i));
// 查看 byteBuf 的类型-UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 10, widx: 10, cap: 10)
System.out.println(byteBuf);
【代码解说】
- 在netty buf中,不需要像nio那样 执行flip 切换读写模式;
- 因为 netty buf,维护了一个 readerIndex 和 writerIndex,分别表示下一次要读入和写出的位置;
运行结果:
0 1 2 3 4 5 6 7 8 9
UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 10, widx: 10, cap: 10)
【1.2】Unpooled.copiedBuffer() 创建buf 缓冲区
copiedBuffer(CharSequence string, Charset charset) 定义:
- 通过给定的数据和字符编码返回一个 ByteBuf 对象(类似于NIO中的ByteBuffer但有区别)
public static ByteBuf copiedBuffer(CharSequence string, Charset charset)
if (string == null)
throw new NullPointerException("string");
if (string instanceof CharBuffer)
return copiedBuffer((CharBuffer) string, charset);
return copiedBuffer(CharBuffer.wrap(string), charset);
代码示例
public class NettyByteBuf62
public static void main(String[] args)
// 通过 Unpooled.copiedBuffer 创建 buf缓冲区
ByteBuf byteBuf = Unpooled.copiedBuffer("hello world", StandardCharsets.UTF_8);
// 1 使用相关方法-byteBuf.hasArray()
if (byteBuf.hasArray())
String content = new String(byteBuf.array(), StandardCharsets.UTF_8);
System.out.println(content);
// 查看ByteBuf的类型-UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 12, cap: 24)
System.out.println("bytebuf = " + byteBuf);
// 查看偏移量
System.out.println("byteBuf.arrayOffset() = " + byteBuf.arrayOffset()); // 0
// 查看 readerIndex
System.out.println("byteBuf.readerIndex() = " + byteBuf.readerIndex()); // 0
// 查看 writerIndex
System.out.println("byteBuf.writerIndex() = " + byteBuf.writerIndex()); // 12
// 查看 capacity
System.out.println("byteBuf.capacity() = " + byteBuf.capacity());
// 查看可读取的字节数量 12
System.out.println("byteBuf.readableBytes() = " + byteBuf.readableBytes());
// 使用for循环读取byteBuf
for (int i = 0; i < byteBuf.readableBytes(); i++)
System.out.print((char)byteBuf.getByte(i));
System.out.println();
// 读取 byteBuf 其中某一段,从下标4开始,读取6个字节
CharSequence charSequence = byteBuf.getCharSequence(4, 6, StandardCharsets.UTF_8);
System.out.println(charSequence);
运行结果:
hello world
bytebuf = UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 11, cap: 33)
byteBuf.arrayOffset() = 0
byteBuf.readerIndex() = 0
byteBuf.writerIndex() = 11
byteBuf.capacity() = 33
byteBuf.readableBytes() = 11
hello world
o worl
【2】netty群聊客户端与服务器
需求描述:
- 基于Netty 实现 多人群聊系统,实现服务器端和客户端之间的数据简单通讯(非阻塞);
- 服务器端:可以监测用户上线,离线,并实现消息转发功能;
- 客户端: 通过channel 可以无阻塞发送消息给其它所有用户,同时可以接受其它用户发送的消息(由服务器转发得到);
【2.1】netty服务器
1)群聊服务器代码
/**
* @Description netty群聊服务器
* @author xiao tang
* @version 1.0.0
* @createTime 2022年09月03日
*/
public class NettyGroupChatServer63
private int port;
public NettyGroupChatServer63(int port)
this.port = port;
public static void main(String[] args)
try
new NettyGroupChatServer63(8089).run();
catch (InterruptedException e)
e.printStackTrace();
public void run() throws InterruptedException
// 创建两个线程组
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try
// 服务器启动引导对象
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioserverSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>()
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception
// 获取pipeline
ChannelPipeline pipeline = socketChannel.pipeline();
// 添加解码处理器 编码器
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
// 添加业务处理handler
pipeline.addLast(new NettyGroupChatServerHandler());
);
ChannelFuture channelFuture = serverBootstrap.bind(this.port).sync();
System.out.println("netty服务器启动成功");
// 监听关闭
channelFuture.channel().closeFuture().sync();
finally
// 优雅关闭线程
bossGroup.shutdownGracefully().sync();
workerGroup.shutdownGracefully().sync();
2)群聊服务器处理器
/**
* @Description netty服务器处理器
* @author xiao tang
* @version 1.0.0
* @createTime 2022年09月03日
*/
public class NettyGroupChatServerHandler extends SimpleChannelInboundHandler<String>
// 定义一个 channel 组,用于管理channel
// GlobalEventExecutor.INSTANCE 是全局事件执行器,是一个单例
private static final ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
// 读取数据并转发
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception
// 获取当前channel
Channel channel = ctx.channel();
// 遍历 channelGroup, 根据不同情况 回送不同消息
channelGroup.forEach(otherChannel->
if (channel != otherChannel) // 非当前channel, 直接转发
otherChannel.writeAndFlush("["+ DateUtils.getNowTimestamp() +"]"+ "客户 " + channel.remoteAddress() + "说:" + msg + "\\n");
else // 回显自己发送的消息给自己
channel.writeAndFlush("["+ DateUtils.getNowTimestamp() +"]"+ "自己说:" + msg + "\\n");
);
// 一旦连接建立,第一个被执行
// 将当前channel 添加到channelGroup
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception
Channel channel = ctx.channel();
// 把客户端加入群组的信息发送到其他客户端
channelGroup.writeAndFlush("["+ DateUtils.getNowTimestamp() +"]"+ "客户端 " + channel.remoteAddress() + " 加入聊天");
// 把当前channel 添加到 channel 组
channelGroup.add(channel);
// 表示 channel 处于活动状态, 提示 xx 上线
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception
System.out.println("["+ DateUtils.getNowTimestamp() +"]"+ "客户端 " + ctx.channel().remoteAddress() + " 上线了");
// 表示 channel 处于离线状态, 提示 xx 离线
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception
System.out.println("["+ DateUtils.getNowTimestamp() +"]"+ "客户端 " + ctx.channel().remoteAddress() + " 离开了");
// 断开连接,把xx客户离开的信息推送给其他在线客户
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception
Channel channel = ctx.channel();
channelGroup.writeAndFlush("["+ DateUtils.getNowTimestamp() +"]"+ "客户端 " + channel.remoteAddress() + " 离开了");
System.out.println("channelGroup.size() = " + channelGroup.size());
// 发送异常如何处理
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception
// 关闭通道
ctx.close();
【2.2】netty客户端
1)群聊客户端代码:
/**
* @Description netty群聊客户端
* @author xiao tang
* @version 1.0.0
* @createTime 2022年09月03日
*/
public class NettyGroupChatClient64
/** 主机和端口 */
private final String host;
private final int port;
/**
* @description 构造器
* @author xiao tang
* @date 2022/9/3
*/
public NettyGroupChatClient64(String host, int port)
this.host = host;
this.port = port;
public static void main(String[] args)
try
new NettyGroupChatClient64("127.0.0.1", 8089).run();
catch (InterruptedException e)
e.printStackTrace();
public void run() throws InterruptedException
// 事件运行的线程池
EventLoopGroup eventExecutors = new NioEventLoopGroup();
try
// 客户端启动引导对象
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventExecutors)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>()
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception
ChannelPipeline pipeline = socketChannel.pipeline();
// 添加解码器 编码器
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
// 添加业务逻辑的 handler
pipeline.addLast(new NettyGroupChatClietnHandler());
);
// 连接给定主机的端口,阻塞直到连接成功
ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
// 得到 channel
Channel channel = channelFuture.channel();
System.out.println("----------" + channel.localAddress() + "----------");
// 客户端需要输入信息,创建一个扫描器
Scanner scanner = new Scanner(System.in);
while (scanner.hasNextLine())
String msg = scanner.nextLine();
// 通过channel 发送到服务器
channel.writeAndFlush(msg);
finally
// 关闭线程池,释放所有资源,阻塞直到关闭成功
eventExecutors.shutdownGracefully().sync();
2)群聊客户端处理器代码:
/**
* @Description netty群聊客户端处理器
* @author xiao tang
* @version 1.0.0
* @createTime 2022年09月03日
*/
public class NettyGroupChatClietnHandler extends SimpleChannelInboundHandler<String>
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception
System.out.println(msg.trim());
【2.3】 运行结果:
1)服务器与客户端: 服务器1个,客户端3个;
2)客户端离线:
【3】netty心跳检测
【3.1】netty心跳检测概述
1)netty定义的空闲状态事件:
Triggers an @link IdleStateEvent when a @link Channel has not performed * read, write, or both operation for a while.当一个通道一段时间内没有执行 读,写,或读写操作时,就会触发 IdleStateEvent事件;
2)需求描述:
- 编写一个 Netty心跳检测机制案例, 当服务器超过3秒没有读时,就提示读空闲;
- 当服务器超过5秒没有写操作时,就提示写空闲;
- 实现当服务器超过7秒没有读或者写操作时,就提示读写空闲;
【3.2】netty心跳检测代码实现
1)netty心跳检测服务器
/**
* @Description netty心跳检测服务器
* @author xiao tang
* @version 1.0.0
* @createTime 2022年09月03日
*/
public class NettyHeartbeatCheckServer66
public static void main(String[] args)
try
new NettyHeartbeatCheckServer66().run();
catch (InterruptedException e)
e.printStackTrace();
public void run() throws InterruptedException
// 创建线程池执行器
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup(8);
try
// 服务器启动引导对象
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(new LoggingHandler(LogLevel.INFO)) // 为 bossGroup 添加 日志处理器
.childHandler(new ChannelInitializer<SocketChannel>()
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception
// 添加处理器
ChannelPipeline pipeline = socketChannel.pipeline();
// 1. 添加空闲状态处理器 :
// readerIdleTime: 表示多长时间没有读入io事件,就会发送一个心跳检测包,检测是否连接状态
// writerIdleTime: 表示多长时间没有写出io事件,就会发送一个心跳检测包,检测是否连接状态
// allIdleTime: 表示多长时间没有读入和写出io事件,就会发送一个心跳检测包,检测是否连接状态
// 2. 文档说明
// Triggers an @link IdleStateEvent when a @link Channel has not performed
// * read, write, or both operation for a while.
// 3. 当 IdleStateEvent 事件触发后, 就会传递给管道的 下一个处理器 去处理
// 通过调用下一个handler的 userEventTriggered 方法,即在该方法中处理IdleStateEvent 事件;
pipeline.addLast(new IdleStateHandler(4, 5, 7, TimeUnit.SECONDS));
// 添加一个对空闲检测 进一步处理的handler(自定义 )
pipeline.addLast(new NettyHeartbeatCheckServerHandler());
);
// 启动服务器,监听端口,阻塞直到启动成功
ChannelFuture channelFuture = serverBootstrap.bind(8089).sync();
// 阻塞直到channel关闭
channelFuture.channel().closeFuture().sync();
finally
bossGroup.shutdownGracefully().sync();
workerGroup.shutdownGracefully().sync();
2)netty心跳检测服务器处理器
/**
* @Description netty心跳检测服务器处理器
* @author xiao tang
* @version 1.0.0
* @createTime 2022年09月04日
*/
public class NettyHeartbeatCheckServerHandler extends ChannelInboundHandlerAdapter
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception
if (evt instanceof IdleStateEvent)
IdleStateEvent event2 = (IdleStateEvent) evt;
String eventType = ""; // 事件类型
switch (event2.state())
case READER_IDLE: eventType = "读空闲"; break;
case WRITER_IDLE: eventType = "写空闲"; break;
case ALL_IDLE: eventType = "读写空闲"; break;
System.out.println("客户端" + ctx.channel().remoteAddress() + "--超时事件--" + eventType);
System.out.println("服务器做相应处理");
// 如果发生空闲,马上关闭通道
// System.out.println("一旦发生超时事件,则关闭 channel");
// ctx.channel().close();
【3.3】运行结果:
1)以 NettyGroupChatClient64 作为客户端连接到 服务器 NettyHeartbeatCheckServer66;
2)打印结果如下:
// 控制台打印结果
客户端/127.0.0.1:61278--超时事件--读空闲
服务器做相应处理
客户端/127.0.0.1:61278--超时事件--写空闲
服务器做相应处理
客户端/127.0.0.1:61278--超时事件--读写空闲
服务器做相应处理
客户端/127.0.0.1:61278--超时事件--读空闲
服务器做相应处理
客户端/127.0.0.1:61278--超时事件--写空闲
服务器做相应处理
客户端/127.0.0.1:61278--超时事件--读空闲
服务器做相应处理
以上是关于8.基于netty实现群聊,心跳检测的主要内容,如果未能解决你的问题,请参考以下文章