Netty实现心跳机制demo

Posted 徐敬哲

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Netty实现心跳机制demo相关的知识,希望对你有一定的参考价值。

Netty实现心跳机制demo

1.springboot项目,首先引入Netty的pom:

        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.25.Final</version>
        </dependency>

2.服务端代码:

package com.netty.heartbeat;

import com.netty.basic.NettyServerHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleStateHandler;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.TimeUnit;

/**
 * @author: jingzhe  @createTime: 2022-02-25 11:13
 * Description: 服务端
 */
@Slf4j
public class NettyServer 
    public static void main(String[] args) 
        // 1.创建两个线程组
        // 负责处理客户端的连接请求
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        // 负责处理客户端的读写请求
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        // 2.创建一个服务端的启动对象
        ServerBootstrap bootstrap = new ServerBootstrap();
        // 3.为启动对象设置相关参数
        // 设置线程池,采用主从线程的Reactor模式
        bootstrap.group(bossGroup, workerGroup)
                // 设置通信类型为NIO类型
                .channel(NioServerSocketChannel.class)
                // 设置从线程的处理逻辑
                .childHandler(new ChannelInitializer<SocketChannel>() 
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception 
                        ChannelPipeline pipeline = socketChannel.pipeline();
                        // 处理粘包/拆包
                        pipeline.addLast(new LineBasedFrameDecoder(18));
                        pipeline.addLast(new StringEncoder());
                        pipeline.addLast(new StringDecoder());
                        // 添加一个监测心跳机制的Handler,当3秒未接收到客户端的心跳包时,则出现超时
                        pipeline.addLast(new IdleStateHandler(3, 0, 0, TimeUnit.SECONDS));
                        pipeline.addLast(new HeartBeatServerHandler());
                    
                );

        // 4.绑定监听端口
        try 
            ChannelFuture future = bootstrap.bind(8888);
            log.info("服务器已经启动,在8888端口进行监听...");
            // 在channel关闭的时候,才去关闭future
            future.channel().closeFuture().sync();
         catch (InterruptedException e) 
            e.printStackTrace();
         finally 
            // 优雅关闭
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        
    

3:。服务端心跳机制处理器:

package com.netty.heartbeat;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.timeout.IdleStateEvent;
import lombok.extern.slf4j.Slf4j;

import java.util.Date;

/**
 * @author: jingzhe  @createTime: 2022-03-02 16:36
 * Description: 服务端处理器
 */
@Slf4j
public class HeartBeatServerHandler extends SimpleChannelInboundHandler<String> 
    /** 记录超时的次数 */
    private int timeCount;

    /** 记录上次超时时间 */
    private long lastIdleTime = 0;

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception 
        if ("heartbeat".equals(msg)) 
            log.info("收到的心跳包:", channelHandlerContext.channel(), new Date());
            /** 超过3秒后,客户端的心跳包是正常的,超时累加的次数清零 */
            if (System.currentTimeMillis() - lastIdleTime >= 3000) 
                log.info("已经有一段时间表现非常稳定,超时次数清零", channelHandlerContext.channel());
                timeCount = 0;
            
         else 
            log.info("非心跳信息,不做处理...");
        
    

    /**
     * 心跳出现超时的时候,会触发该方法的执行
     * @param ctx
     * @param evt
     * @throws Exception
     */
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception 
        IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
        Channel channel = ctx.channel();
        Date now = new Date();
        switch (idleStateEvent.state()) 
            case READER_IDLE:
                log.info(":出现了一次心跳超时", channel, now);
                // 记录超时时间
                lastIdleTime = now.getTime();
                timeCount++;
                break;
            default:
                break;
        
        if (timeCount >= 3) 
            log.info(":心跳超时已经达到3次,将关闭连接", channel);
            channel.close();
        

    

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception 
        log.info(":上线了", ctx.channel());
    

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception 
        log.info(":下线了", ctx.channel());
    

4.客户端代码:

package com.netty.heartbeat;

import com.netty.basic.NettyClientHandler;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import lombok.extern.slf4j.Slf4j;

import java.util.Random;

/**
 * @author: jingzhe  @createTime: 2022-02-25 11:13
 * Description: 客户端
 */
@Slf4j
public class NettyClient 
    public static void main(String[] args) 
        // 1.创建一个线程池,用于读写交互
        EventLoopGroup group = new NioEventLoopGroup();
        // 2.创建一个客户端启动对象
        Bootstrap bootstrap = new Bootstrap();
        // 3.设置相关参数
        bootstrap.group(group)
                // 设置通道为NIO通道
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() 
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception 
                        // 添加处理逻辑
                        ChannelPipeline pipeline = socketChannel.pipeline();
                        // 字符串信息的编码与解码
                        pipeline.addLast(new StringEncoder());
                        pipeline.addLast(new StringDecoder());
                    
                );

        // 4.连接服务端
        try 
            ChannelFuture future = bootstrap.connect("localhost", 8888).sync();
            log.info("连接服务端成功");
            Channel channel = future.channel();
            String msg = "heartbeat\\n";
            Random random = new Random();
            while (channel.isActive()) 
                int num = random.nextInt(6);
                Thread.sleep(num * 1000);
                channel.writeAndFlush(msg);
            
            // 对通道关闭进行监听
            future.channel().closeFuture().sync();
         catch (InterruptedException e) 
            e.printStackTrace();
         finally 
            group.shutdownGracefully();
        

    

---

引入pom后,即可运行,代码下载地址:

https://url83.ctfile.com/f/18819283-551085507-0364b5
(访问密码:9595)

以上是关于Netty实现心跳机制demo的主要内容,如果未能解决你的问题,请参考以下文章

Netty 如何实现心跳机制与断线重连?

Netty 超时机制及心跳程序实现

Netty4服务端心跳机制

Netty 实现心跳机制与断线重连

即时通讯开发Netty实现心跳机制断线重连机制

IM即时通讯开发用Netty实现心跳机制断线重连机制