Netty 主从多线程
Posted FoamValue
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Netty 主从多线程相关的知识,希望对你有一定的参考价值。
Netty is an asynchronous event-driven network application framework for rapid development of maintainable high performance protocol servers & clients.
https://netty.io
Netty 是一个异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。
Netty 是一个 NIO 客户端服务器框架,它支持快速轻松地开发网络应用程序,如协议服务器和客户端。它极大地简化了网络编程,如 TCP 和 UDP 套接字服务器。
Nonblocking I/O
NIO,非阻塞 IO。对比于BIO(Blocking I/O,阻塞IO),NIO 的并发性能得到了很大提高。
常见的五种 IO 模型对比
同步阻塞 IO(BIO)阻塞整个步骤。适用于少连接且延迟低的场景。
同步非阻塞 IO(NIO),阻塞业务处理但不阻塞数据接收。适用于高并发且处理简单的场景。
多路复用 IO,数据请求和业务处理是两个分开进行处理。
信号驱动 IO,主要用在嵌入式开发,不参与讨论。
异步 IO,数据请求和业务处理都是异步的,数据请求一次返回一次,适用于长连接的业务场景。
Netty 是典型的 Reator 模型结构。
Reactor 模式是基于事件驱动开发的,其核心组成部分包括 Reactor 和线程池。其中 Reactor 负责监听和分配事件,而线程池负责处理事件。
根据Reactor的数量和线程池的数量,又将Reactor分为三种模型:
单线程模型 (单 Reactor 单线程)
多线程模型 (单 Reactor 多线程)
主从多线程模型 (多 Reactor 多线程)
什么是主从多线程
从一个主线程 NIO 线程池中选择一个线程(boss)作为 Acceptor 线程,绑定监听端口,接收客户端连接的连接,其他线程(worker)负责后续的业务处理工作。
示例代码
从开源项目中截取了一段 Netty 初始化代码片段。
private void start() {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup).channel(NioserverSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
// TCP默认开启了 Nagle 算法,该算法的作用是尽可能的发送大数据快,减少网络传输。TCP_NODELAY
// 参数的作用就是控制是否启用 Nagle 算法。
.childOption(ChannelOption.TCP_NODELAY, true)
.handler(new LoggingHandler(LogLevel.INFO))
// 当客户端第一次进行请求的时候才会进行初始化
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
// 30 秒之内没有收到客户端请求的话就关闭连接
ch.pipeline().addLast(new IdleStateHandler(30, 0, 0, TimeUnit.SECONDS));
}
})
// TCP默认开启了 Nagle 算法,该算法的作用是尽可能的发送大数据快,减少网络传输。TCP_NODELAY
// 参数的作用就是控制是否启用 Nagle 算法。
.childOption(ChannelOption.TCP_NODELAY, true)
// 是否开启 TCP 底层心跳机制
.childOption(ChannelOption.SO_KEEPALIVE, true)
// 表示系统用于临时存放已完成三次握手的请求的队列的最大长度,如果连接建立频繁,服务器处理创建新连接较慢,可以适当调大这个参数
.option(ChannelOption.SO_BACKLOG, 128);
// 绑定端口,同步等待绑定成功
ChannelFuture f = b.bind(host, port).sync();
ChannelFuture f2 = b.bind(host, port2).sync();
// 等待服务端监听端口关闭
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
System.out.println(String.format("occur exception when start server:", e));
} finally {
System.out.println(String.format("shutdown bossGroup and workerGroup"));
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
b.bind(host, port).sync() 和 b.bind(host, port2).sync() 同时绑定了两个 ip 和 端口。
运行结果
源代码知识点
NioEventLoopGroup
特殊的 EventExecutorGroup 接口类,它允许注册已处理的通道,以便在事件循环期间进行后续选择。
ServerBootstrap
Bootstrap 子类,可轻松引导 ServerChannel。
NioServerSocketChannel
一个 ServerSocketChannel 接口的实现类,它使用基于NIO选择器的实现来接受新连接。
ChannelFuture
异步 Channel I / O操作的结果,未完成或已完成。
代码调试
new NioEventLoopGroup()
MultithreadEventExecutorGroup.java 初始化实例。
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
if (!success) {
for (int j = 0; j < i; j ++) {
children[j].shutdownGracefully();
}
for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
// Let the caller handle the interruption.
Thread.currentThread().interrupt();
break;
}
}
}
}
}
chooser = chooserFactory.newChooser(children);
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}
Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
传入参数:
nThreads 此实例将使用的线程数。
executor 需要执行的 Runable 任务对象 。
choicerFactory 创建 EventExecutorChooser 对象的工厂类。
args 参数将传递给每个 newChild 调用。
new ServerBootstrap()
ServerBootstrap 是 Netty 服务端应用开发的入口。
ServerBootstrap 的配置:
group 方法,设置初始化的主从"线程池"。
channel 方法,设置通道类型。服务端:NioServerSocketChannel。
...
b.bind(host, port).sync()
绑定并侦听某个端口
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
小结
突如其来的三天小长假,彻底打乱了生活节奏 。
一天搬家、一天休息、一天加班。眼见着明天周日应该好好学习知识了,迎来的却是正常班。
周六熬夜写文章,然后明天早起上班去。就这样。
这个周末,又一次成功“强迫”自己学习。
感谢各位小伙伴的阅读,这里是一个技术人的学习与分享。
以上是关于Netty 主从多线程的主要内容,如果未能解决你的问题,请参考以下文章