Netty 主从多线程

Posted FoamValue

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Netty 主从多线程相关的知识,希望对你有一定的参考价值。

Netty is an asynchronous event-driven network application framework for rapid development of maintainable high performance protocol servers & clients.

https://netty.io

Netty 是一个异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。


Netty 是一个 NIO 客户端服务器框架,它支持快速轻松地开发网络应用程序,如协议服务器和客户端。它极大地简化了网络编程,如 TCP 和 UDP 套接字服务器。


Nonblocking I/O

NIO,非阻塞 IO。对比于BIO(Blocking I/O,阻塞IO),NIO 的并发性能得到了很大提高。


常见的五种 IO 模型对比

  • 同步阻塞 IO(BIO)阻塞整个步骤。适用于少连接且延迟低的场景。

  • 同步非阻塞 IO(NIO),阻塞业务处理但不阻塞数据接收。适用于高并发且处理简单的场景。

  • 多路复用 IO,数据请求和业务处理是两个分开进行处理。

  • 信号驱动 IO,主要用在嵌入式开发,不参与讨论。

  • 异步 IO,数据请求和业务处理都是异步的,数据请求一次返回一次,适用于长连接的业务场景。



主从多线程

Netty 是典型的 Reator 模型结构

Reactor 模式是基于事件驱动开发的,其核心组成部分包括 Reactor 和线程池。其中 Reactor 负责监听和分配事件,而线程池负责处理事件。

根据Reactor的数量和线程池的数量,又将Reactor分为三种模型:

  • 单线程模型 (单 Reactor 单线程)

  • 多线程模型 (单 Reactor 多线程)

  • 主从多线程模型 (多 Reactor 多线程)


什么是主从多线程

从一个主线程 NIO 线程池中选择一个线程(boss)作为 Acceptor 线程,绑定监听端口,接收客户端连接的连接,其他线程(worker)负责后续的业务处理工作。



示例代码

从开源项目中截取了一段 Netty 初始化代码片段。

private void start() { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup).channel(NioserverSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) // TCP默认开启了 Nagle 算法,该算法的作用是尽可能的发送大数据快,减少网络传输。TCP_NODELAY // 参数的作用就是控制是否启用 Nagle 算法。 .childOption(ChannelOption.TCP_NODELAY, true) .handler(new LoggingHandler(LogLevel.INFO)) // 当客户端第一次进行请求的时候才会进行初始化 .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) { // 30 秒之内没有收到客户端请求的话就关闭连接 ch.pipeline().addLast(new IdleStateHandler(30, 0, 0, TimeUnit.SECONDS)); } }) // TCP默认开启了 Nagle 算法,该算法的作用是尽可能的发送大数据快,减少网络传输。TCP_NODELAY // 参数的作用就是控制是否启用 Nagle 算法。 .childOption(ChannelOption.TCP_NODELAY, true) // 是否开启 TCP 底层心跳机制 .childOption(ChannelOption.SO_KEEPALIVE, true) // 表示系统用于临时存放已完成三次握手的请求的队列的最大长度,如果连接建立频繁,服务器处理创建新连接较慢,可以适当调大这个参数 .option(ChannelOption.SO_BACKLOG, 128); // 绑定端口,同步等待绑定成功    ChannelFuture f = b.bind(host, port).sync();    ChannelFuture f2 = b.bind(host, port2).sync(); // 等待服务端监听端口关闭 f.channel().closeFuture().sync(); } catch (InterruptedException e) { System.out.println(String.format("occur exception when start server:", e)); } finally { System.out.println(String.format("shutdown bossGroup and workerGroup")); bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); }}

b.bind(host, port).sync() 和 b.bind(host, port2).sync() 同时绑定了两个 ip 和 端口。


运行结果


源代码知识点

NioEventLoopGroup

特殊的 EventExecutorGroup 接口类,它允许注册已处理的通道,以便在事件循环期间进行后续选择。

Netty 主从多线程


ServerBootstrap

Bootstrap 子类,可轻松引导 ServerChannel。

Netty 主从多线程


NioServerSocketChannel

一个 ServerSocketChannel 接口的实现类,它使用基于NIO选择器的实现来接受新连接。

Netty 主从多线程


ChannelFuture

异步 Channel I / O操作的结果,未完成或已完成。

Netty 主从多线程



代码调试

new NioEventLoopGroup()

MultithreadEventExecutorGroup.java 初始化实例。

protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) { if (nThreads <= 0) { throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads)); }
if (executor == null) { executor = new ThreadPerTaskExecutor(newDefaultThreadFactory()); }
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) { boolean success = false; try { children[i] = newChild(executor, args); success = true; } catch (Exception e) { // TODO: Think about if this is a good exception type throw new IllegalStateException("failed to create a child event loop", e); } finally { if (!success) { for (int j = 0; j < i; j ++) { children[j].shutdownGracefully(); }
for (int j = 0; j < i; j ++) { EventExecutor e = children[j]; try { while (!e.isTerminated()) { e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); } } catch (InterruptedException interrupted) { // Let the caller handle the interruption. Thread.currentThread().interrupt(); break; } } } } }
chooser = chooserFactory.newChooser(children);
final FutureListener<Object> terminationListener = new FutureListener<Object>() { @Override public void operationComplete(Future<Object> future) throws Exception { if (terminatedChildren.incrementAndGet() == children.length) { terminationFuture.setSuccess(null); } } };
for (EventExecutor e: children) { e.terminationFuture().addListener(terminationListener); }
Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length); Collections.addAll(childrenSet, children); readonlyChildren = Collections.unmodifiableSet(childrenSet);}

传入参数:

  1. nThreads 此实例将使用的线程数。

  2. executor 需要执行的 Runable 任务对象 。

  3. choicerFactory 创建 EventExecutorChooser 对象的工厂类。

  4. args 参数将传递给每个 newChild 调用。


new ServerBootstrap()

ServerBootstrap 是 Netty 服务端应用开发的入口。


ServerBootstrap 的配置:

  • group 方法,设置初始化的主从"线程池"。

  • channel 方法,设置通道类型。服务端:NioServerSocketChannel。
  • ...

b.bind(host, port).sync()

绑定并侦听某个端口
private ChannelFuture doBind(final SocketAddress localAddress) { final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.cause() != null) { return regFuture; }
if (regFuture.isDone()) { // At this point we know that the registration was complete and successful. ChannelPromise promise = channel.newPromise(); doBind0(regFuture, channel, localAddress, promise); return promise; } else { // Registration future is almost always fulfilled already, but just in case it's not. final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { Throwable cause = future.cause(); if (cause != null) { // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an // IllegalStateException once we try to access the EventLoop of the Channel. promise.setFailure(cause); } else { // Registration was successful, so set the correct executor to use. // See https://github.com/netty/netty/issues/2586 promise.registered();
doBind0(regFuture, channel, localAddress, promise); } } }); return promise; }}



小结

突如其来的三天小长假,彻底打乱了生活节奏 。

一天搬家、一天休息、一天加班。眼见着明天周日应该好好学习知识了,迎来的却是正常班。


周六熬夜写文章,然后明天早起上班去。就这样。


这个周末,又一次成功“强迫”自己学习。


感谢各位小伙伴的阅读,这里是一个技术人的学习与分享。


以上是关于Netty 主从多线程的主要内容,如果未能解决你的问题,请参考以下文章

Netty

彻底搞懂 netty 线程模型

BIO/NIO 线程模型以及高性能通讯框架 Netty Reactor 模型初探

分布式理论,架构设计 Netty

分布式理论,架构设计 Netty

Netty线程模型