netty之心跳机制
Posted better_hui
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了netty之心跳机制相关的知识,希望对你有一定的参考价值。
目录
一、前言
心跳机制就是定时的给对端发送特殊的数据包 , 对端收到后回复特殊的数据包 , 这一次往返的ping-pong过程 , 就是一次心跳,心跳的目的是为了让双方感知 ,对方还活着。
TCP协议层也是有心跳机制的 , 但是他的心跳是2个小时 ,且依赖底层操作系统 ,整体来讲不是很灵活 , 所以一般都是在应用层自由实现。
二、netty的心跳工具
netty 提供了IdleStateHandler , ReadTimeoutHandler , WriteTimeoutHandler 三个工具类来监测链接的存活性 , 当然 , 我们也可以自由实现。
序号 | 名称 | 作用 |
---|---|---|
1 | IdleStateHandler | 监测链接的读、写、读写空闲时间是否超过了配置的指定时间 , 如果超过了,则触发一个IdleStateEvent事件 ,我们可以通过重写 ChannelInboundHandler.userEventTrigged 方法来做处理。 |
2 | ReadTimeouthandler | 指定时间内,没有发生读事件 , 则抛出异常,并自动关闭链接 , 我们可以在execeptionCaught方法中处理这个异常 |
3 | WriteTimeoutHandler | 同上 , 只不过关心的是写事件 |
总结 : 这两类工具 , 整体来讲 , 各有千秋:
IdleStatehandler 是以事件的方式 ,方式上比较优雅, 让服务感知到对端的链接空闲了, 具体如何操作 , 全看我们自己
Read/WriteTimeoutHandler已异常的方式处理 , 简单有效 , 且直接关闭了链接
三、IdleStatehandler
当连接的空闲时间(读或者写)太长时,将会触发一个 IdleStateEvent 事件。然后,你可以通过你的 ChannelInboundHandler 中重写 userEventTrigged 方法来处理该事件
1、构造方法
private final boolean observeOutput;// 是否考虑出站时较慢的情况。默认值是false(不考虑)。
private final long readerIdleTimeNanos; // 读事件空闲时间,0 则禁用事件
private final long writerIdleTimeNanos;// 写事件空闲时间,0 则禁用事件
private final long allIdleTimeNanos; //读或写空闲时间,0 则禁用事件
2、handlerAdded
当该handler被添加到pipeline时 , 会调用initialize方法
给定的监测时间大于0 , 就会被创建周期调度任务 。 同时 ,将state状态设置为1, 防止重复初始化。
private void initialize(ChannelHandlerContext ctx)
//当channel被销毁时 或者 已经初始化过 , 不在初始化这个方法 , 直接返回
switch (state)
case 1:
case 2:
return;
state = 1;
//初始化 写状态的部分监测
initOutputChanged(ctx);
lastReadTime = lastWriteTime = ticksInNanos();
//添加了三个调度任务
if (readerIdleTimeNanos > 0)
// 这里的 schedule 方法会调用 eventLoop 的 schedule 方法,将定时任务添加进队列中
readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),
readerIdleTimeNanos, TimeUnit.NANOSECONDS);
if (writerIdleTimeNanos > 0)
writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),
writerIdleTimeNanos, TimeUnit.NANOSECONDS);
if (allIdleTimeNanos > 0)
allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),
allIdleTimeNanos, TimeUnit.NANOSECONDS);
调用initOutputChanged方法 , 初始化 监控出栈数据属性
private void initOutputChanged(ChannelHandlerContext ctx)
if (observeOutput)
Channel channel = ctx.channel();
Unsafe unsafe = channel.unsafe();
ChannelOutboundBuffer buf = unsafe.outboundBuffer();
// 记录了出站缓冲区相关的数据,buf 对象的 hash 码,和 buf 的剩余缓冲字节数
if (buf != null)
lastMessageHashCode = System.identityHashCode(buf.current());
lastPendingWriteBytes = buf.totalPendingWriteBytes();
observeOutput 针对出栈较慢的情况监测 。记录最后一次输出消息的相关信息 , 并使用一个值firstXXXXIdleEvent标识是否再次活动过,每次读写活动都变更其为ture ,那么如果是false ,说明这段时间没有发生过读写 。
同时 , 如果第一次记录的出栈数据 和 第二次得到的不同 , 说明出栈缓慢 , 则不触发空闲时间。
3、定时任务
抽象父任务: AbstractIdleTask
读空闲任务:ReaderIdleTimeoutTask
写空闲任务:WriterIdleTimeoutTask
读写空闲任务:AllIdleTimeoutTask
private abstract static class AbstractIdleTask implements Runnable
public void run()
// 当通道不是打开状态 , 不再执行
if (!ctx.channel().isOpen())
return;
run(ctx);
4、读事件空闲
protected void run(ChannelHandlerContext ctx)
//重新计算下次任务调度的间隔时间
long nextDelay = readerIdleTimeNanos;
if (!reading)
//配置的时间 - (最后一次读取 距离目前的时间差)
nextDelay -= ticksInNanos() - lastReadTime;
//最新的调度时间 <=0 说明 , 空闲了 ,
if (nextDelay <= 0)
// 发生空闲了 , 添加下一次的调度任务
readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);
boolean first = firstReaderIdleEvent;
firstReaderIdleEvent = false;
try
// 再次提交任务
IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first);
// 触发用户 handler use
channelIdle(ctx, event);
catch (Throwable t)
ctx.fireExceptionCaught(t);
else
// Read occurred before the timeout - set a new timeout with shorter delay.
// 使用最新的调度时间 , 重新入调度队列
readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
5、写事件空闲
protected void run(ChannelHandlerContext ctx)
// 最后一次写的时间
long lastWriteTime = IdleStateHandler.this.lastWriteTime;
// 计算最新的任务调度时间
long nextDelay = writerIdleTimeNanos - (ticksInNanos() - lastWriteTime);
// 空闲了
if (nextDelay <= 0)
// Writer is idle - set a new timeout and notify the callback.
// 重新入调度队列
writerIdleTimeout = schedule(ctx, this, writerIdleTimeNanos, TimeUnit.NANOSECONDS);
boolean first = firstWriterIdleEvent;
firstWriterIdleEvent = false;
try
// 判断下是不是出栈慢的情况 ,如果是 不触发空闲事件
if (hasOutputChanged(ctx, first))
return;
//触发一个事件
IdleStateEvent event = newIdleStateEvent(IdleState.WRITER_IDLE, first);
channelIdle(ctx, event);
catch (Throwable t)
ctx.fireExceptionCaught(t);
else
// Write occurred before the timeout - set a new timeout with shorter delay.
// 以最新时间 重新入调度队列
writerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
以上是关于netty之心跳机制的主要内容,如果未能解决你的问题,请参考以下文章