Netty相关
Posted 买糖买板栗
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Netty相关相关的知识,希望对你有一定的参考价值。
netty版本:4.1.30.Final
四个基本事件
OP_READ = 1 00000001 读
OP_WRITE = 4 00000100 写
OP_CONNECT = 8 00001000 连接
OP_ACCEPT = 16 00010000 接受连接
如果一个SelectionKey感兴趣读, 那么其interestOps=00000001
如果一个SelectionKey感兴趣写, 那么其interestOps=00000100
如果一个SelectionKey感兴趣连接, 那么其interestOps=00001000
如果一个SelectionKey感兴趣接受连接,那么其interestOps=00010000
如果感兴趣多个事件:java运算符 与(&)、非(~)、或(|)、异或(^)
读\\写:00000001 | 00000100 = 00000101
读\\写\\连接\\接受连接:00000001 | 00000100 | 00001000 | 00010000 = 00011101
判断自己是否有关注某个事件:
int interestSet = selectionKey.interestOps();
boolean isInterestedAccept = (interestSet & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT;
boolean isInterestedConnect = (interestSet & SelectionKey.OP_CONNECT) == SelectionKey.OP_CONNECT;
boolean isInterestedInRead = (interestSet & SelectionKey.OP_READ) == SelectionKey.OP_READ;
boolean isInterestedInWrite = (interestSet & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE;
Selector是什么?
它是一个抽象类,看它一个子类:SelectorImpl,它维护了一个Set<SelectionKey>(可以在selector上面注册多个socketchannel,selector会给每个channel返回一个selectionKey,该selectionKey就是标识该channel注册在某一个selector上)
Selector几个重载的select()方法:
- select():阻塞到至少有一个通道在你注册的事件上就绪了。
- select(long timeout):和select()一样,但最长阻塞事件为timeout毫秒。
- selectNow():非阻塞,只要有通道就绪就立刻返回(如果没有也立马返回)。
Selector的wakeUp方法:
- 当发现任务队列里面没有任务要处理就会执行selector的阻塞方法select(timeout),如果这时候突然有外部线程调用channel去发送一个消息,那么netty会把改消息包装成一个任务放入队列,放入队列的时候netty会检测当前selector是否处于阻塞,如果是则调用wakeup方法让selector从阻塞中恢复过来进行loop的run方法执行。
SelectionKey是什么?
它是一个抽象类,看它一个子类:SelectionKeyImpl,看它的定义:
public class SelectionKeyImpl extends AbstractSelectionKey
final SelChImpl channel;
public final SelectorImpl selector;
private int index;
private volatile int interestOps;
private int readyOps;
......
可以看到,一个SelectionKey中包含一个channel、一个Selector、该channel感兴趣的事件集合interestOps,SelectionKey就是标识该channel注册在某一个selector上
NioServerSocketChannel源码
public class NioserverSocketChannel extends AbstractNioMessageChannel
implements io.netty.channel.socket.ServerSocketChannel
@Override
protected int doReadMessages(List<Object> buf) throws Exception
SocketChannel ch = javaChannel().accept();
try
if (ch != null)
buf.add(new NioSocketChannel(this, ch));
return 1;
catch (Throwable t)
logger.warn("Failed to create a new channel from an accepted socket.", t);
try
ch.close();
catch (Throwable t2)
logger.warn("Failed to close a socket.", t2);
return 0;
首先通过ServerSocketChannel的accept()方法接受新的客户端请求,如果SocketChannel不为空,则利用当前的NioServerSocketChannel、EventLoop和SocketChannel创建新的NioSocketChannel,并加入到List<Object> buf中,最后返回1,标识服务端读取消息成功,对于NioServerSocketChannel,它的读取操作就是接受客户端连接请求,创建NioSocketChannel对象。
NioSocketChannel源码
public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel
@Override
protected SocketChannel javaChannel()
return (SocketChannel) super.javaChannel();
@Override
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception
if (localAddress != null)
javaChannel().socket().bind(localAddress);
boolean success = false;
try
boolean connected = javaChannel().connect(remoteAddress);
if (!connected)
selectionKey().interestOps(SelectionKey.OP_CONNECT);
success = true;
return connected;
finally
if (!success)
doClose();
先调用javaChannel().socket().bind(localAddress)绑定本地地址,若成功则:调用javaChannel().connect(remoteAddress)发起TCP连接
NioEventLoop是什么?
nioEventloop其内部(父类)持有一个thread对象,而该nioEventloop的run方法正是由该线程执行的
nioEventloop还持有两个最重要的对象--selector和queue,前者就是我们的多路复用器,后者则是存放我们的任务队列
分析nioEventloop的run方法:
@Override
protected void run()
for (;;)
try
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks()))
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));
if (wakenUp.get())
selector.wakeup();
default:
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100)
try
processSelectedKeys();
finally
// Ensure we always run tasks.
runAllTasks();
else
final long ioStartTime = System.nanoTime();
try
processSelectedKeys();
finally
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
catch (Throwable t)
handleLoopException(t);
// Always handle shutdown even if the loop processing threw an exception.
try
if (isShuttingDown())
closeAll();
if (confirmShutdown())
return;
catch (Throwable t)
handleLoopException(t);
- 第一步:先看switch里面的:selectStrategy.calculateStrategy(selectNowSupplier, hasTasks()):这是一个返回Int类型值的方法:calculateStrategy()方法里面的实现:return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;如果任务队列不为空,返回selectSupplier.get(),这个是get()是:return NioEventLoop.this.selectNow() --> this.selector.selectNow(),NioEventLoop是什么已经介绍过selector.selectNow()
- 若第一步返回的是SelectStrategy.SELECT,switch走到select(wakenUp.getAndSet(false));这代表当前没有可执行的任务,这个方法里面有一个非常著名的解决netty空轮询的判断:判断是否触发了jdk的selector的空轮旬bug,主要原理就是发现每次selector从阻塞方法中醒来既没有任务也没有io时间且阻塞的时间也没到我们设置的时间,如果一直重复这样到达一定的阈值,则我们就rebuildselector。详细分析见我的:Selector空轮询
- 若第一步返回的是:selector.selectNow()>0,继续看下面的代码再看看下面的processSelectedKeys();这是真正处理每个channel的Io事件的方法;跟进去看:
private void processSelectedKeysOptimized() for (int i = 0; i < selectedKeys.size; ++i) final SelectionKey k = selectedKeys.keys[i]; // null out entry in the array to allow to have it GC'ed once the Channel close // See https://github.com/netty/netty/issues/2363 selectedKeys.keys[i] = null; final Object a = k.attachment(); if (a instanceof AbstractNioChannel) processSelectedKey(k, (AbstractNioChannel) a); else @SuppressWarnings("unchecked") NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; processSelectedKey(k, task); if (needsToSelectAgain) // null out entries in the array to allow to have it GC'ed once the Channel close // See https://github.com/netty/netty/issues/2363 selectedKeys.reset(i + 1); selectAgain(); i = -1;
遍历该EventLoop持有的selector的selectedKeys:最终调用的是:
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); if (!k.isValid()) final EventLoop eventLoop; try eventLoop = ch.eventLoop(); catch (Throwable ignored) return; if (eventLoop != this || eventLoop == null) return; unsafe.close(unsafe.voidPromise()); return; try int readyOps = k.readyOps(); if ((readyOps & SelectionKey.OP_CONNECT) != 0) int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); unsafe.finishConnect(); if ((readyOps & SelectionKey.OP_WRITE) != 0) ch.unsafe().forceFlush(); if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) unsafe.read(); catch (CancelledKeyException ignored) unsafe.close(unsafe.voidPromise());
这个就已经是我们熟悉的NIO中的类似操作读写连接事件的io
处理逻辑里面还有一个else:processSelectedKey(k, task);暂时不看了,详情见:task执行
netty服务端启动
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try
// 创建并初始化 Netty 服务端 Bootstrap 对象
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup);
bootstrap.channel(NioServerSocketChannel.class);
bootstrap.childHandler(new ChannelInitializer<SocketChannel>()
@Override
public void initChannel(SocketChannel channel) throws Exception
ChannelPipeline pipeline = channel.pipeline();
/**
* 收到客户端请求后,先执行解码RpcDecoder,在进入RpcServerHandler执行
* 执行完成后,由RpcEncoder编码后,返回给客户端
*/
//我是RpcDecoder,属于ChannelInboundHandlerAdapter,先add先执行
pipeline.addLast(new RpcDecoder(RpcRequest.class)); // 解码 RPC 请求
//我是RpcEncoder,属于ChannelOutboundHandlerAdapter,先add却要后执行
pipeline.addLast(new RpcEncoder(RpcResponse.class)); // 编码 RPC 响应
//我是RpcServerHandler,属于ChannelInboundHandlerAdapter,先add先执行,所以我在RpcDecoder之后执行
pipeline.addLast(new RpcServerHandler(handlerMap)); // 处理 RPC 请求
);
bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
// 获取 RPC 服务器的 IP 地址与端口号
String[] addressArray = StringUtil.split(serviceAddress, ":");
String ip = addressArray[0];
int port = Integer.parseInt(addressArray[1]);
// 启动 RPC 服务器
ChannelFuture future = bootstrap.bind(ip, port).sync();
// 注册 RPC 服务地址
if (serviceRegistry != null)
for (String interfaceName : handlerMap.keySet())
serviceRegistry.register(interfaceName, serviceAddress);
LOGGER.debug("register service: => ", interfaceName, serviceAddress);
LOGGER.debug("server started on port ", port);
// 关闭 RPC 服务器
future.channel().closeFuture().sync();
finally
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
从ChannelFuture future = bootstrap.bind(ip, port).sync()开始分析:调用的是:AbstractBootstrap.bind,最后调用AbstractBootstrap的这个方法:
private ChannelFuture doBind(final SocketAddress localAddress)
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null)
return regFuture;
if (regFuture.isDone())
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
else
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener()
@Override
public void operationComplete(ChannelFuture future) throws Exception
Throwable cause = future.cause();
if (cause != null)
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
else
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();
doBind0(regFuture, channel, localAddress, promise);
);
return promise;
看这里方法面的两个核心方法:initAndRegister()
,以及doBind0(),先看initAndRegister():
final ChannelFuture initAndRegister()
Channel channel = null;
try
channel = channelFactory.newChannel();
init(channel);
catch (Throwable t)
ChannelFuture regFuture = config().group().register(channel);
return regFuture;
initAndRegister主要做三个事情:new channel、init channel、register channel
newChannel():
channelFactory是
这个方法是:bootstrap.channel(NioServerSocketChannel.class),即channelFactory.newChannel()最终创建channel相当于调用默认构造函数new出一个 NioServerSocketChannel对象。NioServerSocketChannel的初始化还会涉及到newChannelPipeline等各种参数的初始化,后面再单独分析。public B channel(Class<? extends C> channelClass) if (channelClass == null) throw new NullPointerException("channelClass"); return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
init(channel):由于是服务端的channel的init,具体方法实现我们找到ServerBootstrap.init
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> @Override void init(Channel channel) throws Exception final Map<ChannelOption<?>, Object> options = options0(); synchronized (options) setChannelOptions(channel, options, logger); final Map<AttributeKey<?>, Object> attrs = attrs0(); synchronized (attrs) for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) @SuppressWarnings("unchecked") AttributeKey<Object> key = (AttributeKey<Object>) e.getKey(); channel.attr(key).set(e.getValue()); ChannelPipeline p = channel.pipeline(); final EventLoopGroup currentChildGroup = childGroup; final ChannelHandler currentChildHandler = childHandler; final Entry<ChannelOption<?>, Object>[] currentChildOptions; final Entry<AttributeKey<?>, Object>[] currentChildAttrs; synchronized (childOptions) currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0)); synchronized (childAttrs) currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0)); p.addLast(new ChannelInitializer<Channel>() @Override public void initChannel(final Channel ch) throws Exception final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null) pipeline.addLast(handler); ch.eventLoop().execute(new Runnable() @Override public void run() pipeline.addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); ); );
重点看最后的p.addLast()向serverChannel的流水线处理器中加入了一个
ServerBootstrapAcceptor
,从名字上就可以看出来,这是一个接入器,专门接受新请求,把新的请求扔给某个事件循环器。register channel:
NioEventLoop 中的register(实际是其父类SingleThreadEventLoop的对应方法):
这个ChannelFuture regFuture = config().group().register(channel)实际调用的是public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop @Override public ChannelFuture register(Channel channel) return register(new DefaultChannelPromise(channel, this)); @Override public ChannelFuture register(final ChannelPromise promise) ObjectUtil.checkNotNull(promise, "promise"); promise.channel().unsafe().register(this, promise); return promise;
这个promise.channel().unsafe().register(this, promise)实际是:AbstractUnsafe.register,后面代码没法理解了,后面再学吧
服务端整体逻辑架构
服务端 Netty Reactor 工作架构图
Server 端包含 1 个 Boss NioEventLoopGroup 和 1 个 Worker NioEventLoopGroup。
NioEventLoopGroup 相当于 1 个事件循环组,这个组里包含多个事件循环 NioEventLoop,每个 NioEventLoop 包含 1 个 Selector 和 1 个事件循环线程。
每个 Boss NioEventLoop 循环执行的任务包含 3 步:
- 轮询 Accept 事件。
- 处理 Accept I/O 事件,与 Client 建立连接,生成 NioSocketChannel,并将 NioSocketChannel 注册到某个 Worker NioEventLoop 的 Selector 上。
- 处理任务队列中的任务,runAllTasks。任务队列中的任务包括用户调用 eventloop.execute 或 schedule 执行的任务,或者其他线程提交到该 eventloop 的任务。
每个 Worker NioEventLoop 循环执行的任务包含 3 步:
- 轮询 Read、Write 事件。
- 处理 I/O 事件,即 Read、Write 事件,在 NioSocketChannel 可读、可写事件发生时进行处理。
- 处理任务队列中的任务,runAllTasks。
服务端是:ServerBootstrap.childHandler()
客户端是:Bootstrap.handler()
那么childHandler()与handler()方法有什么区别?
handler():是客户端新接入的连接SocketChannel对应的ChannelPipeline的Handler,一般都是Bootstrap端用
childHandler():是NioServerSocketChannel对应的ChannelPipeline的Handler,一般都是ServerBootstrap用
参考文档:
1 channel结构图
通过上图我们可以看到, 一个 Channel 包含了一个 ChannelPipeline, 而 ChannelPipeline 中又维护了一个由 ChannelHandlerContext 组成的双向链表. 这个链表的头是 HeadContext, 链表的尾是 TailContext, 并且每个 ChannelHandlerContext 中又关联着一个 ChannelHandler。
2 传播机制
Inbound 事件传播方法有:(head -> customContext -> tail)
ChannelHandlerContext.fireChannelRegistered()
ChannelHandlerContext.fireChannelActive()
ChannelHandlerContext.fireChannelRead(Object)
ChannelHandlerContext.fireChannelReadComplete()
ChannelHandlerContext.fireExceptionCaught(Throwable)
ChannelHandlerContext.fireUserEventTriggered(Object)
ChannelHandlerContext.fireChannelWritabilityChanged()
ChannelHandlerContext.fireChannelInactive()
ChannelHandlerContext.fireChannelUnregistered()
Oubound 事件传输方法有:(tail -> customContext -> head)
ChannelHandlerContext.bind(SocketAddress, ChannelPromise)
ChannelHandlerContext.connect(SocketAddress, SocketAddress, ChannelPromise)
ChannelHandlerContext.write(Object, ChannelPromise)
ChannelHandlerContext.flush()
ChannelHandlerContext.read()
ChannelHandlerContext.disconnect(ChannelPromise)
ChannelHandlerContext.close(ChannelPromise)
数据从head节点流入,先拆包,然后解码成业务对象,最后经过业务Handler处理,调用write,将结果对象写出去。而写的过程先通过tail节点,然后通过encoder节点将对象编码成ByteBuf,最后将该ByteBuf对象传递到head节点,调用底层的Unsafe写到jdk底层管道。
3 pipeline
final class DefaultChannelPipeline implements ChannelPipeline
final AbstractChannel channel;
final AbstractChannelHandlerContext head;
final AbstractChannelHandlerContext tail;
DefaultChannelPipeline(AbstractChannel channel)
if (channel == null)
throw new NullPointerException("channel");
this.channel = channel;
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
static final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler
实现Inbound接口,且方法全部空实现
static final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler
实现Outbound接口,且方法全部空实现
TailContext是outbound事件的起点,inbound的终点,他的所有inbound方法都是空实现,而outbound方法则是简单的把控制权交给前面的handler。有了tail,我们能够保证inbound处理流最终能够优雅的结束掉(空实现,事件不会再往后传播),而不用在进行各种空判断;而对于outbound事件,它直接唤起下一个handler处理,充当了一个隐形的老好人。可以这么说,虽然它确实没什么太大的卵用,但是没有它,你会发现很多地方的代码将变得很难看。
HeadContext是inbound的起点,outbound的终点。作为inbound的起点,他也只是默默的把控制权交给后面的handler。而作为outbound的终点,他承担的责任则非常重要,他负责对outbound事件进行最终的底层调用(这里的底层是netty底层,而非jdk底层或操作系统底层),因此如果你暂时不关心编解码,而想了解write方法之类的最终实现,可以直接在HeadContext的对应方法上加断点,然后就可以直接了解这一部分知识了。
4 TCP粘包/拆包
在TCP传输中,一个完整的消息包可能会被TCP拆分成多个包进行发送,也有可能把多个小的包封装成一个大的数据包发送,因而数据接收方无法区分消息包的头尾,在接收方来看,可能一次接收的数据内容只是一次完整请求的部分数据,或包含多次请求的数据等情况。基于此,常见有三种解决方式:
- 消息包定长,固定每个消息包长度,不够的话用空格补,缺点是长度不能灵活控制,字节不够的情况下补空格会造成浪费,字节过多还要处理分包传输的逻辑
- 使用定长消息头和变长消息体,其中消息体的长度必须在消息头指出,在接收方每次接收数据时,先确保获取到定长的消息头,再从消息头中解析出消息体的长度,以此来界定获取以此完整请求的消息包数据。
- 在消息包尾部指定分隔符,缺点是需要遍历数据,判断是否为分隔符,影响效率。
公司内部RPC框架采用第二种方式,如何进行粘包/拆包处理细节:FrameDecoder
实现原理:不断读取接收到的字节流,并累加到cumulation变量,通过调用callDecode来尝试对当前累加的字节Buffer cumulation进行解码,直到解析出一个完整请求的feame对象,最后会调用Channels#fireMessageReceived触发Handler的pipeline调用来完成一次完整请求。
个人图解,读者忽略:
5 Netty设置高低水位
Server端:
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024);
bootstrap.childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 8 * 1024);
Client端:
Bootstrap bootstrap = new Bootstrap();
bootstrap.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024);
bootstrap.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 8 * 1024)
设置高低水位的意义:
英文:For instance, imagine you have a queue of tasks on server side that is filled by clients and processed by backend. In case clients send tasks too quick the length of the queue grows. One needs to introduce so named high watermark and low watermark. If queue length is greater than high watermark stop reading from sockets and queue length will decrease. When queue length becomes less than low watermark start reading tasks from sockets again.Note, to make it possible for clients to adapt to speed you process tasks (actually to adapt window size) one shouldn't make a big gap between high and low watermarks. From the other side small gap means you'll be too often add/remove sockets from the event loop.
中文:例如,假设您在服务器端有一个队列,由客户端填充,并由后端处理。 如果客户端发送任务的速度过快,则队列的长度会增加。 需要引入所谓的高水位和低水位。 如果队列长度大于高水位标记,请停止从套接字读取数据,队列长度将减少。 当队列长度小于低水位标记时,再次开始从套接字读取任务。请注意,为了使客户能够适应您的处理任务(实际上是适应窗口大小)的速度,不应在高水位线和低水位线之间留出较大的差距。 另一方面,小间隙意味着您经常会从事件循环中添加/删除套接字。自己根据业务特点找平衡。
以上是关于Netty相关的主要内容,如果未能解决你的问题,请参考以下文章
翻翻git之---自定义View实现水位上涨效果 WaveProgressView