netty入门(二十四)Netty心跳机制源码剖析

Posted zhufei463738313

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了netty入门(二十四)Netty心跳机制源码剖析相关的知识,希望对你有一定的参考价值。

1.源码剖析的目的

Netty作为一个网络框架,提供了诸多功能,比如编码解码等,Netty 还提供了非常重要的一个服务 ---- 心跳机制 heartbeat。通过心跳检查对方是否有效,这在 RPC 框架中是必不可少的功能。下面我们分析一下 Netty 内部 心跳服务源码的实现。

2.源码解析使用案例

2.1 源码案例

public class MyServer 
    public static void main(String[] args) throws InterruptedException 
        // 创建两个线程组
        NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
        NioEventLoopGroup workerGroup = new NioEventLoopGroup(8);

        try 
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workerGroup)
                    .channel(NioserverSocketChannel.class)
                    .handler(new LoggingHandler(LogLevel.INFO)) // 在bossgroup增加一个日志处理器
                    .childHandler(new ChannelInitializer<SocketChannel>() 
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception 
                            ChannelPipeline pipeline = ch.pipeline();
                            // 加入 netty 提供的 IdleStateHandler
                            /**
                             * 说明
                             * 1. IdleStateHandler 是 netty 提供的处理空闲状态的处理器
                             * 2. readerIdleTime:表示多长时间没有读,就会发送一个心跳检测包,检测是否还是连接状态
                             * 3. writerIdleTime:表示多长时间没有写,就会发送一个心跳检测包,检测是否还是连接状态
                             * 4. allIdleTime:表示多长时间没有读写,就会发送一个心跳检测包,检测是否还是连接状态
                             * 5. 当 IdleStateEvent 触发后,就会传递给管道的下一个handler进行处理,通过调用(触发)下一个 handler 的userEventTrigged 方法,
                             * 在该方法中去处理 IdleStateEvent 事件
                             */
                            pipeline.addLast(new IdleStateHandler(3,5,7, TimeUnit.SECONDS));
                            // 加入一个对空闲检测进一步处理的handler(自定义)
                            pipeline.addLast(new MyServerHandler());
                        
                    );

            ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();
            channelFuture.channel().closeFuture().sync();

        finally 
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        
    


/**
 * 心跳处理器
 */
public class MyServerHandler extends ChannelInboundHandlerAdapter 
    /**
     *
     * @param ctx 上下文
     * @param evt 事件
     * @throws Exception 异常
     */
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception 
        if(evt instanceof IdleStateEvent)
            // 将 evt 向下转型
            IdleStateEvent event = (IdleStateEvent) evt;
            String eventType = null;
            switch (event.state())
                case READER_IDLE:
                    eventType = "读空闲";
                    break;
                case WRITER_IDLE:
                    eventType = "写空闲";
                    break;
                case ALL_IDLE:
                    eventType = "读写空闲";
                    break;
            

            System.out.println(ctx.channel().remoteAddress() + "--超时事件发生-->" + eventType);
            System.out.println("服务器做相应处理......");
        
    

如上源码中,我们对 Netty 服务器的(读/写)活动情况进行监听,利用 IdleStateHandler 来完成,其中三个参数分别读、写、读/写 空闲时间。

2.2介绍

Netty 提供了 IdleStateHandler ,ReadTimeoutHandler,WriteTimeoutHandler 三个 Handler 检测连接的有效性。

ReadTimeout 事件和 WriteTimeout 事件都会自动关闭连接,而且,属于异常处理,所以,重点分析IdleStateHandler。

序号

名称

作用

1

IdleStateHandler

当连接的空闲时间(读或者写)太长时,将会触发一个 IdleStateEvent 事件。然后你可以通过你的 ChannelInboundHandler 中重写 userEventTrigged 方法来处理该事件。

2

ReadTimeoutHandler

如果在指定的时间没有发生读事件,就会抛出这个异常,并自动关闭这个连接。你可以在 exceptionCaught 方法中处理这个异常。

3

WriteTimeoutHandler

当一个写操作不能在一定的时间内完成时,抛出此异常,并关闭连接。你同样可以在 exceptionCaught 方法中处理这个异常。

ReadTimeout 事件和 WriteTimeout 事件都会自动关闭连接,而且,属于异常处理,所以,这里只重点介绍IdleStateHandler。

3.IdleStateHandler分析

首先,当调用 addLast 添加 IdleStateHandler 到 pipeline 中时,会触发 handlerAdd 方法

// 添加操作
pipeline.addLast(new IdleStateHandler(3,5,7, TimeUnit.SECONDS));

// 该方法
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception 
    if (ctx.channel().isActive() && ctx.channel().isRegistered()) 
        // channelActive() event has been fired already, which means this.channelActive() will
        // not be invoked. We have to initialize here instead.
        initialize(ctx);
     else 
        // channelActive() event has not been fired yet.  this.channelActive() will be invoked
        // and initialization will occur there.
    

关注 initialize 方法,如下代码中,对 IdleStateHandler 的属性进行初始化过程,涉及到以下几个。

private void initialize(ChannelHandlerContext ctx) 
    // Avoid the case where destroy() is called before scheduling timeouts.
    // See: https://github.com/netty/netty/issues/143
    switch (state) 
    case 1:
    case 2:
        return;
    

    state = 1;
    initOutputChanged(ctx);

    lastReadTime = lastWriteTime = ticksInNanos();
    if (readerIdleTimeNanos > 0) 
        readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),
                readerIdleTimeNanos, TimeUnit.NANOSECONDS);
    
    if (writerIdleTimeNanos > 0) 
        writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),
                writerIdleTimeNanos, TimeUnit.NANOSECONDS);
    
    if (allIdleTimeNanos > 0) 
        allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),
                allIdleTimeNanos, TimeUnit.NANOSECONDS);
    
  • private final boolean observeOutput; : 是否考虑出站时候较慢的情况,默认是false

  • private final long readerIdleTimeNanos; : 读事件空闲时间,0 则禁用事件

  • private final long writerIdleTimeNanos; : 写事件空闲时间,0 则禁用事件

  • private final long allIdleTimeNanos; : 读/写事件空闲时间,0 则禁用事件

源码中,当参数 > 0 时,就创建一个定时任务,每个事件都创建,同时将 state 设置为1,避免重复初始化,调用 initOutputChanged 方法初始化监控出站数据属性。

创建定时任务源码如下,利用 channel 中的 EventLoop 来添加一个 Scheduler,看源码并 debug,观察 EventLoop 中 Scheduler 线程池中是否有新增任务,如下:

ScheduledFuture<?> schedule(ChannelHandlerContext ctx, Runnable task, long delay, TimeUnit unit) 
    return ctx.executor().schedule(task, delay, unit);

在 ScheduledTaskQueue 中的确有3个Task,因为 addLast 的方法中有三个参数都被初始化了,那么我们来看下对应的三个 Task 实现方法。

三个 Task 都是 ChannelDuplexHandler 中的内部类,并且有一个共同父类, AbstractIdleTask

  private abstract static class AbstractIdleTask implements Runnable 

        private final ChannelHandlerContext ctx;

        AbstractIdleTask(ChannelHandlerContext ctx) 
            this.ctx = ctx;
        

        @Override
        public void run() 
            if (!ctx.channel().isOpen()) 
                return;
            

            run(ctx);
        

        protected abstract void run(ChannelHandlerContext ctx);
    
  • AbstractIdleTask内部类设计用来单例模式,模板模式UML如下

  • 模板方法的通用流程:当通道关闭,就不执行任务,反之则执行抽象方法 run 方法,

3.1 读事件 task 分析

  • 先看 ReaderIdelTimeoutTask 方法中的 run 方法的实现

private final class ReaderIdleTimeoutTask extends AbstractIdleTask 

    ReaderIdleTimeoutTask(ChannelHandlerContext ctx) 
        super(ctx);
    

    @Override
    protected void run(ChannelHandlerContext ctx) 
        long nextDelay = readerIdleTimeNanos;
        if (!reading) 
            nextDelay -= ticksInNanos() - lastReadTime;
        

        if (nextDelay <= 0) 
            // Reader is idle - set a new timeout and notify the callback.
            // 用于取消任务 promise
            readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);

            boolean first = firstReaderIdleEvent;
            firstReaderIdleEvent = false;

            try 
                // 再次提交任务
                IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first);
                // 触发用户 handler use
                channelIdle(ctx, event);
             catch (Throwable t) 
                ctx.fireExceptionCaught(t);
            
         else 
            // Read occurred before the timeout - set a new timeout with shorter delay.
            readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
        
    
说明:
(1)得到用户设置的超时时间。
(2)如果读取操作结束了(执行了 channelReadComplete 方法设置),就用当前时间减去给定时间和最后一次读(执行了操作的时间 channelReadComplete 方法设置),如果小于 0 ,就触发事件。反之,继续放入队列。间隔时间是新的计算时间。
(3)触发的逻辑是:首先将任务再次放到队列,时间是刚开始设置的时间,返回一个 promise 对象,用于做取消操作。然后,设置 first 属性为 false,表示下一次读取不是第一次了。这个属性在 channelRead 方法会被改成 true。
(4)创建一个 IdleStateEvent 类型的写事件对象,将此对象传递给用户的 UserEventTriggered 方法。完成触发事件的操作。
(5)总的来说,每次读取操作都会记录一个时间,定时任务时间到了,会计算当前时间和最后一次读的时间的间隔,如果间隔超过了设置的时间,就出发 UserEventTriggered 方法。

3.2 写事件 task 分析

  • 写事件的run方法WriterIdleTimeoutTask, 和读事件的差不多,如下有一点区别

private final class WriterIdleTimeoutTask extends AbstractIdleTask 

    WriterIdleTimeoutTask(ChannelHandlerContext ctx) 
        super(ctx);
    

    @Override
    protected void run(ChannelHandlerContext ctx) 

        long lastWriteTime = IdleStateHandler.this.lastWriteTime;
        long nextDelay = writerIdleTimeNanos - (ticksInNanos() - lastWriteTime);
        if (nextDelay <= 0) 
            // Writer is idle - set a new timeout and notify the callback.
            writerIdleTimeout = schedule(ctx, this, writerIdleTimeNanos, TimeUnit.NANOSECONDS);

            boolean first = firstWriterIdleEvent;
            firstWriterIdleEvent = false;

            try 
                if (hasOutputChanged(ctx, first)) 
                    return;
                

                IdleStateEvent event = newIdleStateEvent(IdleState.WRITER_IDLE, first);
                channelIdle(ctx, event);
             catch (Throwable t) 
                ctx.fireExceptionCaught(t);
            
         else 
            // Write occurred before the timeout - set a new timeout with shorter delay.
            writerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
        
    

  • 不通地方在于,run代码中多了一个判断hasOutputChanged 用来判断出站事件处理是否很慢,例如,我一个出站需要写的数据量大,需要处理20秒,但是我们执行写事件监测的时间是 5秒,此时我们只有第一次会触发,因为 hasOutputChanged 中有判断first 的时候还是会执行异常抛出

3.3 所有事件(读/写)事件 task 分析

private final class AllIdleTimeoutTask extends AbstractIdleTask 

    AllIdleTimeoutTask(ChannelHandlerContext ctx) 
        super(ctx);
    

    @Override
    protected void run(ChannelHandlerContext ctx) 

        long nextDelay = allIdleTimeNanos;
        if (!reading) 
            nextDelay -= ticksInNanos() - Math.max(lastReadTime, lastWriteTime);
        
        if (nextDelay <= 0) 
            // Both reader and writer are idle - set a new timeout and
            // notify the callback.
            allIdleTimeout = schedule(ctx, this, allIdleTimeNanos, TimeUnit.NANOSECONDS);

            boolean first = firstAllIdleEvent;
            firstAllIdleEvent = false;

            try 
                if (hasOutputChanged(ctx, first)) 
                    return;
                

                IdleStateEvent event = newIdleStateEvent(IdleState.ALL_IDLE, first);
                channelIdle(ctx, event);
             catch (Throwable t) 
                ctx.fireExceptionCaught(t);
            
         else 
            // Either read or write occurred before the timeout - set a new
            // timeout with shorter delay.
            allIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
        
    
  • 与写事件基本一致,不同地方在于事件判断

  • nextDelay -= tickInNanos() - Math.max(lastReadTime, lastWriteTime);当前时间-最后一次写/读的时间,如果>0说明超过了

  • 此处的时间判断是取读,写事件中的最大值,然后像写事件一样,判断是否发生了写很慢的情况

4.Netty心跳机制总结

(1)IdleStateHandler 实现心跳检测功能,当服务器和客户端没有任务读写,并且超过设置事件,会触发 handler 的 userEventTriggered 方法,用户可以在这个方法中实现自己的逻辑。

(2)IdleStateHandler 的实现基于 EventLoop 的定时任务,每次读写都会记录一个值,在定时任务运行的时候,通过计算当前时间和设置时间和上次事件发生时间的结果,来判断是否空闲。

(3)内部有 3 个定时任务,分别对应读事件、写事件、读写事件。通常用户监听读写事件就足够了。

(4)同时,IdleStateHandler 内部也考虑了一些极端情况:客户端接收缓慢,一次接收数据的速度超过了设置的空闲时间。Netty 通过构造方法中的 observeOutput 属性来决定是否对出站缓冲区的情况进行判断。

(5)如果出站缓慢,Netty 不任务这是空闲,也就不触发空闲事件。但第一次无论如何也是要触发的。因为第一次无法判断是出站缓慢还是空闲。当然,出站缓慢的话,可能造成 OOM,OOM比空闲的问题更大。

(6)所以,当你的应用出现了内存溢出,OOM之类,并且写空闲极少发生(使用了 observeOutput 为 true),那么就需要注意是不是数据出站速度过慢。

(7)还有一个注意的地方:就是 ReadTimeoutHandler,它继承自 IdelStateHandler,当触发读空闲事件的时候,就触发 ctx.fireExceptionCaught 方法,并传入一个 ReadTimeoutException,然后关闭 Socket。

(8)而 WriteTimeoutHandler 的实现不是基于 IdleStateHandler 的,他的原理是,当调用 write方法的时候,会创建一个定时任务,任务内容是根据传入的 promise 的完成情况来判断是否超出了写的时间。当定时任务根据指定时间开始运行,发现 promise 的 isDone 方法返回 false,表名还没有写完,说明超时了,则抛出异常。当 write 方法完成后,会打断定时任务。

一起学Netty(十四)之 Netty生产级的心跳和重连机制

sigh,写这篇博客的时候老脸还是红了一下,心里还是有些唏嘘的,应该算是剽窃吧,每个人的代码功力的确是有差距的,好在文章的标题是“一起学”,而不是开涛大神的“跟我学”系列的文章,我们还是多花点时间学习吧,感叹无用~


最近工作比较忙,但闲暇之余还是看了阿里的冯家春(fengjiachun)的github上的开源代码Jupiter,写的RPC框架让我感叹人外有人,废话不多说,下面的代码全部截取自Jupiter,写了一个比较完整的例子,供大家一起学习分享,再次对@Luca抱拳,Jupiter的Github地址:


https://github.com/fengjiachun/Jupiter


今天研究的是,心跳和重连,虽然这次是大神写的代码,但是万变不离其宗,我们先回顾一下Netty应用心跳和重连的整个过程:

1)客户端连接服务端

2)在客户端的的ChannelPipeline中加入一个比较特殊的IdleStateHandler,设置一下客户端的写空闲时间,例如5s

3)当客户端的所有ChannelHandler中4s内没有write事件,则会触发userEventTriggered方法(上文介绍过)

4)我们在客户端的userEventTriggered中对应的触发事件下发送一个心跳包给服务端,检测服务端是否还存活,防止服务端已经宕机,客户端还不知道

5)同样,服务端要对心跳包做出响应,其实给客户端最好的回复就是“不回复”,这样可以服务端的压力,假如有10w个空闲Idle的连接,那么服务端光发送心跳回复,则也是费事的事情,那么怎么才能告诉客户端它还活着呢,其实很简单,因为5s服务端都会收到来自客户端的心跳信息,那么如果10秒内收不到,服务端可以认为客户端挂了,可以close链路

6)加入服务端因为什么因素导致宕机的话,就会关闭所有的链路链接,所以作为客户端要做的事情就是短线重连


以上描述的就是整个心跳和重连的整个过程,虽然很简单,上一篇blog也写了一个Demo,简单地做了一下上述功能


要写工业级的Netty心跳重连的代码,需要解决一下几个问题:

1)ChannelPipeline中的ChannelHandlers的维护,首次连接和重连都需要对ChannelHandlers进行管理

2)重连对象的管理,也就是bootstrap对象的管理

3)重连机制编写


完整的代码:https://github.com/BazingaLyn/netty-study/tree/master/src/main/java/com/lyncc/netty/idle


下面我们就看大神是如何解决这些问题的,首先先定义一个接口ChannelHandlerHolder,用来保管ChannelPipeline中的Handlers的

package com.lyncc.netty.idle;

import io.netty.channel.ChannelHandler;

/**
 * 
 * 客户端的ChannelHandler集合,由子类实现,这样做的好处:
 * 继承这个接口的所有子类可以很方便地获取ChannelPipeline中的Handlers
 * 获取到handlers之后方便ChannelPipeline中的handler的初始化和在重连的时候也能很方便
 * 地获取所有的handlers
 */
public interface ChannelHandlerHolder {

    ChannelHandler[] handlers();
}
我们再来编写我们熟悉的服务端的ServerBootstrap的编写:

HeartBeatServer.java

package com.lyncc.netty.idle;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.IdleStateHandler;

import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;

public class HeartBeatServer {
    
    private final AcceptorIdleStateTrigger idleStateTrigger = new AcceptorIdleStateTrigger();
    
    private int port;

    public HeartBeatServer(int port) {
        this.port = port;
    }

    public void start() {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap sbs = new ServerBootstrap().group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class).handler(new LoggingHandler(LogLevel.INFO))
                    .localAddress(new InetSocketAddress(port)).childHandler(new ChannelInitializer<SocketChannel>() {
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new IdleStateHandler(5, 0, 0, TimeUnit.SECONDS));
                            ch.pipeline().addLast(idleStateTrigger);
                            ch.pipeline().addLast("decoder", new StringDecoder());
                            ch.pipeline().addLast("encoder", new StringEncoder());
                            ch.pipeline().addLast(new HeartBeatServerHandler());
                        };

                    }).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true);
            // 绑定端口,开始接收进来的连接
            ChannelFuture future = sbs.bind(port).sync();

            System.out.println("Server start listen at " + port);
            future.channel().closeFuture().sync();
        } catch (Exception e) {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        int port;
        if (args.length > 0) {
            port = Integer.parseInt(args[0]);
        } else {
            port = 8080;
        }
        new HeartBeatServer(port).start();
    }

}
单独写一个AcceptorIdleStateTrigger,其实也是继承ChannelInboundHandlerAdapter,重写userEventTriggered方法,因为客户端是write,那么服务端自然是read,设置的状态就是IdleState.READER_IDLE,源码如下:

package com.lyncc.netty.idle;

import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;


@ChannelHandler.Sharable
public class AcceptorIdleStateTrigger extends ChannelInboundHandlerAdapter {

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleState state = ((IdleStateEvent) evt).state();
            if (state == IdleState.READER_IDLE) {
                throw new Exception("idle exception");
            }
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }
}
HeartBeatServerHandler就是一个很简单的自定义的Handler,不是重点:

package com.lyncc.netty.idle;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class HeartBeatServerHandler extends ChannelInboundHandlerAdapter {


    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("server channelRead..");
        System.out.println(ctx.channel().remoteAddress() + "->Server :" + msg.toString());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }

}

接下来就是重点,我们需要写一个类,这个类可以去观察链路是否断了,如果断了,进行循环的断线重连操作,ConnectionWatchdog,顾名思义,链路检测狗,我们先看完整代码:

package com.lyncc.netty.idle;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.util.Timeout;
import io.netty.util.Timer;
import io.netty.util.TimerTask;

import java.util.concurrent.TimeUnit;

/**
 * 
 * 重连检测狗,当发现当前的链路不稳定关闭之后,进行12次重连
 */
@Sharable
public abstract class ConnectionWatchdog extends ChannelInboundHandlerAdapter implements TimerTask ,ChannelHandlerHolder{
    
    
    
    private final Bootstrap bootstrap;
    private final Timer timer;
    private final int port;
    
    private final String host;

    private volatile boolean reconnect = true;
    private int attempts;
    
    
    public ConnectionWatchdog(Bootstrap bootstrap, Timer timer, int port,String host, boolean reconnect) {
        this.bootstrap = bootstrap;
        this.timer = timer;
        this.port = port;
        this.host = host;
        this.reconnect = reconnect;
    }
    
    /**
     * channel链路每次active的时候,将其连接的次数重新☞ 0
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        
        System.out.println("当前链路已经激活了,重连尝试次数重新置为0");
        
        attempts = 0;
        ctx.fireChannelActive();
    }
    
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("链接关闭");
        if(reconnect){
            System.out.println("链接关闭,将进行重连");
            if (attempts < 12) {
                attempts++;
                //重连的间隔时间会越来越长
                int timeout = 2 << attempts;
                timer.newTimeout(this, timeout, TimeUnit.MILLISECONDS);
            }
        }
        ctx.fireChannelInactive();
    }
    

    public void run(Timeout timeout) throws Exception {
        
        ChannelFuture future;
        //bootstrap已经初始化好了,只需要将handler填入就可以了
        synchronized (bootstrap) {
            bootstrap.handler(new ChannelInitializer<Channel>() {

                @Override
                protected void initChannel(Channel ch) throws Exception {
                    
                    ch.pipeline().addLast(handlers());
                }
            });
            future = bootstrap.connect(host,port);
        }
        //future对象
        future.addListener(new ChannelFutureListener() {

            public void operationComplete(ChannelFuture f) throws Exception {
                boolean succeed = f.isSuccess();

                //如果重连失败,则调用ChannelInactive方法,再次出发重连事件,一直尝试12次,如果失败则不再重连
                if (!succeed) {
                    System.out.println("重连失败");
                    f.channel().pipeline().fireChannelInactive();
                }else{
                    System.out.println("重连成功");
                }
            }
        });
        
    }

}


 稍微分析一下: 

1)继承了ChannelInboundHandlerAdapter,说明它也是Handler,也对,作为一个检测对象,肯定会放在链路中,否则怎么检测

2)实现了2个接口,TimeTask,ChannelHandlerHolder

   ①TimeTask,我们就要写run方法,这应该是一个定时任务,这个定时任务做的事情应该是重连的工作

   ②ChannelHandlerHolder的接口,这个接口我们刚才说过是维护的所有的Handlers,因为在重连的时候需要获取Handlers

3)bootstrap对象,重连的时候依旧需要这个对象

4)当链路断开的时候会触发channelInactive这个方法,也就说触发重连的导火索是从这边开始的

好了,我们这边再写次核心的HeartBeatsClient的代码:

package com.lyncc.netty.idle;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.HashedWheelTimer;

import java.util.concurrent.TimeUnit;

public class HeartBeatsClient {
    
    protected final HashedWheelTimer timer = new HashedWheelTimer();
    
    private Bootstrap boot;
    
    private final ConnectorIdleStateTrigger idleStateTrigger = new ConnectorIdleStateTrigger();

    public void connect(int port, String host) throws Exception {
        
        EventLoopGroup group = new NioEventLoopGroup();  
        
        boot = new Bootstrap();
        boot.group(group).channel(NioSocketChannel.class).handler(new LoggingHandler(LogLevel.INFO));
            
        final ConnectionWatchdog watchdog = new ConnectionWatchdog(boot, timer, port,host, true) {

                public ChannelHandler[] handlers() {
                    return new ChannelHandler[] {
                            this,
                            new IdleStateHandler(0, 4, 0, TimeUnit.SECONDS),
                            idleStateTrigger,
                            new StringDecoder(),
                            new StringEncoder(),
                            new HeartBeatClientHandler()
                    };
                }
            };
            
            ChannelFuture future;
            //进行连接
            try {
                synchronized (boot) {
                    boot.handler(new ChannelInitializer<Channel>() {

                        //初始化channel
                        @Override
                        protected void initChannel(Channel ch) throws Exception {
                            ch.pipeline().addLast(watchdog.handlers());
                        }
                    });

                    future = boot.connect(host,port);
                }

                // 以下代码在synchronized同步块外面是安全的
                future.sync();
            } catch (Throwable t) {
                throw new Exception("connects to  fails", t);
            }
    }

    /**
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {
        int port = 8080;
        if (args != null && args.length > 0) {
            try {
                port = Integer.valueOf(args[0]);
            } catch (NumberFormatException e) {
                // 采用默认值
            }
        }
        new HeartBeatsClient().connect(port, "127.0.0.1");
    }

}
也稍微说明一下:

1)创建了ConnectionWatchdog对象,自然要实现handlers方法

2)初始化好bootstrap对象

3)4秒内没有写操作,进行心跳触发,也就是IdleStateHandler这个方法


最后ConnectorIdleStateTrigger这个类

package com.lyncc.netty.idle;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.CharsetUtil;

@Sharable
public class ConnectorIdleStateTrigger extends ChannelInboundHandlerAdapter {
    
    private static final ByteBuf HEARTBEAT_SEQUENCE = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("Heartbeat",
            CharsetUtil.UTF_8));

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleState state = ((IdleStateEvent) evt).state();
            if (state == IdleState.WRITER_IDLE) {
                // write heartbeat to server
                ctx.writeAndFlush(HEARTBEAT_SEQUENCE.duplicate());
            }
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }
}
HeartBeatClientHandler.java(不是重点)

package com.lyncc.netty.idle;

import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;

import java.util.Date;

@Sharable
public class HeartBeatClientHandler extends ChannelInboundHandlerAdapter {

    
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("激活时间是:"+new Date());
        System.out.println("HeartBeatClientHandler channelActive");
        ctx.fireChannelActive();
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("停止时间是:"+new Date());
        System.out.println("HeartBeatClientHandler channelInactive");
    }


    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        String message = (String) msg;
        System.out.println(message);
        if (message.equals("Heartbeat")) {
            ctx.write("has read message from server");
            ctx.flush();
        }
        ReferenceCountUtil.release(msg);
    }
}


好了,到此为止,所有的代码都贴完了,我们做一个简单的测试,按照常理,如果不出任何状况的话,客户端4秒发送心跳,服务端5秒才验证是不会断连的,所以我们在启动之后,关闭服务端,然后再次重启服务端

首先启动服务端,控制台如下:

启动客户端,控制台如下:

客户端启动之后,服务端的控制台:

关闭服务端后,客户端控制台:

重启启动服务端:

重连成功~




以上是关于netty入门(二十四)Netty心跳机制源码剖析的主要内容,如果未能解决你的问题,请参考以下文章

Netty核心技术及源码剖析-Netty入站与出站机制

Netty网络编程实战4,使用Netty实现心跳检测机制

Netty实现心跳机制demo

Netty4服务端心跳机制

netty 心跳包和断线重连机制

Netty 超时机制及心跳程序实现