netty之心跳机制

Posted better_hui

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了netty之心跳机制相关的知识,希望对你有一定的参考价值。

目录

一、前言

二、netty的心跳工具

三、IdleStatehandler

1、构造方法

2、handlerAdded

3、定时任务

4、读事件空闲

5、写事件空闲


一、前言

心跳机制就是定时的给对端发送特殊的数据包 , 对端收到后回复特殊的数据包 , 这一次往返的ping-pong过程 , 就是一次心跳,心跳的目的是为了让双方感知 ,对方还活着。

TCP协议层也是有心跳机制的 , 但是他的心跳是2个小时 ,且依赖底层操作系统 ,整体来讲不是很灵活 , 所以一般都是在应用层自由实现。

二、netty的心跳工具

netty 提供了IdleStateHandler , ReadTimeoutHandler , WriteTimeoutHandler 三个工具类来监测链接的存活性 , 当然 , 我们也可以自由实现。

序号名称作用
1IdleStateHandler监测链接的读、写、读写空闲时间是否超过了配置的指定时间 , 如果超过了,则触发一个IdleStateEvent事件 ,我们可以通过重写 ChannelInboundHandler.userEventTrigged 方法来做处理。
2ReadTimeouthandler指定时间内,没有发生读事件 , 则抛出异常,并自动关闭链接 , 我们可以在execeptionCaught方法中处理这个异常
3WriteTimeoutHandler同上 , 只不过关心的是写事件

总结 : 这两类工具 , 整体来讲 , 各有千秋:

IdleStatehandler 是以事件的方式 ,方式上比较优雅, 让服务感知到对端的链接空闲了, 具体如何操作 , 全看我们自己

Read/WriteTimeoutHandler已异常的方式处理 , 简单有效 , 且直接关闭了链接

三、IdleStatehandler

当连接的空闲时间(读或者写)太长时,将会触发一个 IdleStateEvent 事件。然后,你可以通过你的 ChannelInboundHandler 中重写 userEventTrigged 方法来处理该事件

1、构造方法

private final boolean observeOutput;// 是否考虑出站时较慢的情况。默认值是false(不考虑)。 
private final long readerIdleTimeNanos; // 读事件空闲时间,0 则禁用事件
private final long writerIdleTimeNanos;// 写事件空闲时间,0 则禁用事件
private final long allIdleTimeNanos; //读或写空闲时间,0 则禁用事件

2、handlerAdded

当该handler被添加到pipeline时 , 会调用initialize方法

给定的监测时间大于0 , 就会被创建周期调度任务 。 同时 ,将state状态设置为1, 防止重复初始化。

private void initialize(ChannelHandlerContext ctx) {
    //当channel被销毁时 或者 已经初始化过 , 不在初始化这个方法 , 直接返回
    switch (state) {
    case 1:
    case 2:
        return;
    }
    state = 1;
    //初始化 写状态的部分监测
    initOutputChanged(ctx);
​
    lastReadTime = lastWriteTime = ticksInNanos();
    //添加了三个调度任务 
    if (readerIdleTimeNanos > 0) {
      // 这里的 schedule 方法会调用 eventLoop 的 schedule 方法,将定时任务添加进队列中
        readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),
                readerIdleTimeNanos, TimeUnit.NANOSECONDS);
    }
    if (writerIdleTimeNanos > 0) {
        writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),
                writerIdleTimeNanos, TimeUnit.NANOSECONDS);
    }
    if (allIdleTimeNanos > 0) {
        allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),
                allIdleTimeNanos, TimeUnit.NANOSECONDS);
    }
}

调用initOutputChanged方法 , 初始化 监控出栈数据属性

private void initOutputChanged(ChannelHandlerContext ctx) {
    if (observeOutput) {
        Channel channel = ctx.channel();
        Unsafe unsafe = channel.unsafe();
        ChannelOutboundBuffer buf = unsafe.outboundBuffer();
        // 记录了出站缓冲区相关的数据,buf 对象的 hash 码,和 buf 的剩余缓冲字节数
        if (buf != null) {
            lastMessageHashCode = System.identityHashCode(buf.current());
            lastPendingWriteBytes = buf.totalPendingWriteBytes();
        }
    }
}

observeOutput 针对出栈较慢的情况监测 。记录最后一次输出消息的相关信息 , 并使用一个值firstXXXXIdleEvent标识是否再次活动过,每次读写活动都变更其为ture ,那么如果是false ,说明这段时间没有发生过读写 。

同时 , 如果第一次记录的出栈数据 和 第二次得到的不同 , 说明出栈缓慢 , 则不触发空闲时间。

3、定时任务

抽象父任务: AbstractIdleTask

读空闲任务:ReaderIdleTimeoutTask

写空闲任务:WriterIdleTimeoutTask

读写空闲任务:AllIdleTimeoutTask

private abstract static class AbstractIdleTask implements Runnable {
    public void run() {
        // 当通道不是打开状态 , 不再执行
        if (!ctx.channel().isOpen()) {
            return;
        }
​
        run(ctx);
    }    
}

4、读事件空闲

protected void run(ChannelHandlerContext ctx) {
    //重新计算下次任务调度的间隔时间
    long nextDelay = readerIdleTimeNanos;
    if (!reading) {
        //配置的时间 - (最后一次读取 距离目前的时间差)
        nextDelay -= ticksInNanos() - lastReadTime;
    }
    //最新的调度时间 <=0 说明 , 空闲了 , 
    if (nextDelay <= 0) {
        // 发生空闲了 , 添加下一次的调度任务
        readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);
​
        boolean first = firstReaderIdleEvent;
        firstReaderIdleEvent = false;
​
        try {
            // 再次提交任务
            IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first);
            // 触发用户 handler use
            channelIdle(ctx, event);
        } catch (Throwable t) {
            ctx.fireExceptionCaught(t);
        }
    } else {
        // Read occurred before the timeout - set a new timeout with shorter delay.
        // 使用最新的调度时间 , 重新入调度队列
        readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
    }
}

5、写事件空闲

protected void run(ChannelHandlerContext ctx) {
    // 最后一次写的时间
    long lastWriteTime = IdleStateHandler.this.lastWriteTime;
    // 计算最新的任务调度时间
    long nextDelay = writerIdleTimeNanos - (ticksInNanos() - lastWriteTime);
    // 空闲了
    if (nextDelay <= 0) {
        // Writer is idle - set a new timeout and notify the callback.
        // 重新入调度队列
        writerIdleTimeout = schedule(ctx, this, writerIdleTimeNanos, TimeUnit.NANOSECONDS);
​
        boolean first = firstWriterIdleEvent;
        firstWriterIdleEvent = false;
​
        try {
            // 判断下是不是出栈慢的情况 ,如果是 不触发空闲事件
            if (hasOutputChanged(ctx, first)) {
                return;
            }
            //触发一个事件
            IdleStateEvent event = newIdleStateEvent(IdleState.WRITER_IDLE, first);
            channelIdle(ctx, event);
        } catch (Throwable t) {
            ctx.fireExceptionCaught(t);
        }
    } else {
        // Write occurred before the timeout - set a new timeout with shorter delay.
        // 以最新时间 重新入调度队列
        writerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
    }
}

以上是关于netty之心跳机制的主要内容,如果未能解决你的问题,请参考以下文章

netty之心跳机制

dubbo之心跳机制

Dubbo分析之心跳设计

Dubbo分析之心跳设计

Dubbo分析之心跳设计

网络I/o编程模型17 netty框架实现心跳机制