Netty 从成神到升仙系列 三Netty 凭什么成为国内最流行的网络通信框架?
Posted 爱敲代码的小黄
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Netty 从成神到升仙系列 三Netty 凭什么成为国内最流行的网络通信框架?相关的知识,希望对你有一定的参考价值。
- 👏作者简介:大家好,我是爱敲代码的小黄,独角兽企业的Java开发工程师,Java领域新星创作者。
- 📝个人公众号:爱敲代码的小黄(回复 “技术书籍” 可获千本电子书籍)
- 📕系列专栏:Java设计模式、数据结构和算法、Kafka从入门到成神、Kafka从成神到升仙、操作系统从入门到成神
- 📧如果文章知识点有错误的地方,请指正!和大家一起学习,一起进步👀
- 🔥如果感觉博主的文章还不错的话,请👍三连支持👍一下博主哦
- 🍂博主正在努力完成2022计划中:以梦为马,扬帆起航,2022追梦人
文章目录
Netty 源码
一、源码构成
二、Netty对三种I/O的支持
首先,我们看下这几种 I/O 的实现。
BIO(阻塞IO):食堂排队打饭模式,持队在窗口,打好才走
NIO(非阻塞IO):点单、等待被叫模式,等待被叫,好了自己去端
AIO(异步IO):包厢模式,点单后菜直接被端上桌。
BIO的实现:
// 如果是BIO的话
EventLoopGroup bossGroup = new OioEventLoopGroup();
EventLoopGroup workerGroup = new OioEventLoopGroup();
b.group(bossGroup, workerGroup).channel(OioserverSocketChannel.class)
其余实现也如上所示,我们 Linux 下的环境基本都是 NIO 的 I/O 实现
1、三种模式如何切换
我们可以看到,上面我们 Netty 实际支持三种 I/O 模式,那么这三种 I/O 是怎么样进行的切换呢?
方法的执行是在:
b.group(bossGroup, workerGroup)
// 主要在下面这一行!!!
.channel(NioServerSocketChannel.class)
我们观察一下,.channel
方法做了什么
public B channel(Class<? extends C> channelClass)
// 反射工厂的实现
return channelFactory(new ReflectiveChannelFactory<C>(
ObjectUtil.checkNotNull(channelClass, "channelClass")
));
public ReflectiveChannelFactory(Class<? extends T> clazz)
// 校验当前传入的clazz是否为空
ObjectUtil.checkNotNull(clazz, "clazz");
try
// 通过反射获取对应类的无参构造器
this.constructor = clazz.getConstructor();
// 上面获取完无参构造器之后,这里会通过无参构造器调用 newInstance 得到一个类的实例
public T newChannel()
try
//反射创建channel
return constructor.newInstance();
简单描述一下,通过 泛型+反射+工厂 实现 IO 模式切换
三、Netty 如何支持 Reactor 模型
Reactor 模型通过注册监听事件的方式,将阻塞模式修改为非阻塞模式,大大的提升了效率。
真正去执行的是 EventLoop
,而我们的 EventLoopGroup
则是多个 EventLoop
的集合
1. EventLoop 是线程嘛
首先,我们看一下 EventLoop
的继承关系:
可以明显的看出,我们的 EventLoop
最终继承 ScheduledExecutorService
方法,也就是我们经常经常使用的线程池中的定时线程。
2. 如何实现三种模式的 Reactor 模型
我们知道,一般 Reactor
模型,有三种:
2.1 单线程 Reactor 模式
EventLoopGroup bossGroup = new NioEventLoopGroup(1); //有参的构造方法,创建1个线程
使用上述代码即可创建单线程的 Reactor 模型。
但具体如何实现,我们去看下源码:
// nThreads 传递的入参
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args)
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
在源码中明确表明了,如果当前你传入的入参为 **0(不传入参)**时,当前线程为 DEFAULT_EVENT_LOOP_THREADS
,如果你传递了入参,则按照你入参的个数申请线程。
PS:
DEFAULT_EVENT_LOOP_THREADS
的大小在源码中定义如下:
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
"io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
这里的 NettyRuntime.availableProcessors()
最终调用的方法为:Runtime.getRuntime().availableProcessors())
有兴趣的读者可以自己去看下自己电脑的线程,博主的线程如下:
- IDEA的线程
- 鲁大师的面板
2.2 多线程 Reactor 模式
如果我们传递无入参的话,创建的就是多线程的 Reactor 模式
这种相对于单线程来说,其实提升并不是很大,简单而言就是多个线程的堆积。
比如
- 原来一个线程完成接受、读取、解码、计算、加码、发送整个流程
- 现在多个线程一起完成接受、读取、解码、计算、加码、发送整个流程
相当于复制了几份而已
2.3 主从 Reactor 模式
而我们的主从 Reactor 模式则打破了原有的架构,采用了一个新的架构进行数据的接受和发送
我们创建两个线程组,一个线程组负责接受客户端的消息,另外一个线程组负责读取、解码、计算、加码、发送整个流程。
这样,我们每个线程都有自己要做的事情并且由于接受客户端的消息很快,我们的 mainReactor
线程组会比原来接受更多的客户端消息
2.3.1 主从模式如何实现
Netty 中只需要指定两个 EventLoopGroup
即可,如下:
EventLoopGroup bossGroup = new NioEventLoopGroup(); //无参的构造方法: 线程组,多个线程
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
我们的 bossGroup
就是负责接受数据的线程组,而我们的 workerGroup
就是负责处理数据的线程组
2.3.2 主从模式源码实现
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup)
// 这里会设置父线程组(接受数据的)
super.group(parentGroup);
ObjectUtil.checkNotNull(childGroup, "childGroup");
this.childGroup = childGroup;
return this;
// 类的全路径:io.netty.bootstrap.AbstractBootstrap
volatile EventLoopGroup group;
public B group(EventLoopGroup group)
ObjectUtil.checkNotNull(group, "group");
this.group = group;
return self();
经过上述操作,我们传入的 bossGroup
已经被赋值到 io.netty.bootstrap.AbstractBootstrap.group
了。
我们看上述的图片,可以看出来,这个 bossGroup
主要做的是接受客户端的数据连接,但口说无凭,源码中在哪实现了呢
主要在于 bind()
这个方法:
private ChannelFuture doBind(final SocketAddress localAddress)
final ChannelFuture regFuture = initAndRegister();//这个方法比较重要
final ChannelFuture initAndRegister()
// 在这里的config().group()拿出我们的 EventLoopGroup 进行注册
ChannelFuture regFuture = config().group().register(channel);
return regFuture;
而这里的 EventLoopGroup
我们可以看到,他正是我们之前设置的 bossGroup
,也就是 io.netty.bootstrap.AbstractBootstrap.group
这个变量
另外,我们的从模式解决读事件的方法,可以直接去 io.netty.bootstrap.ServerBootstrap.childGroup
这个变量去找
3. Netty 给 Channel 分配 NIOEventLoop 的规则
我们上面可以看到,一个 EventLoopGroup
包括多个 EventLoop
,那么我们处理数据,Netty 是如何分配这些 EventLoop
的呢?
在源码 io.netty.util.concurrent.MultithreadEventExecutorGroup.next()
方法中
@Override
public EventExecutor next()
return chooser.next(); //chooser是一个选择器,一般的实现会使用策略模式
利用 chooser
选择器,选择不同的策略执行不同的 EventLoop
// 选择器的选择
public EventExecutorChooser newChooser(EventExecutor[] executors)
//根据待绑定的executor是否是2的幂次方,做出不同的选择
// 这里的二次方后面会讲到
if (isPowerOfTwo(executors.length))
return new PowerOfTwoEventExecutorChooser(executors);
else
return new GenericEventExecutorChooser(executors);
我们点进去发现,这里的策略一共有两种
- GenericEventExecutorChooser:取模轮询
- PowerOfTwoEventExecutorChooser:幂等运算(数组长度必须是2的幂次方)
3.1 GenericEventExecutorChooser
这里其实没什么好说的,就简单的取模、
private static final class GenericEventExecutorChooser implements EventExecutorChooser
// 原子递增
private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;
GenericEventExecutorChooser(EventExecutor[] executors)
this.executors = executors;
@Override
public EventExecutor next()
//递增、取模,取正值,不然可能是负数,另外:有个非常小的缺点,当idx累加成最大值后,有短暂的不公平:
//1,2,3,4,5,6,7,0,7 7 7
// 当我们的 idx.getAndIncrement() 到达最大值 Integer.MAX_VALUE 时,会不再增加,也就一直是
return executors[Math.abs(idx.getAndIncrement() % executors.length)];
3.2 PowerOfTwoEventExecutorChooser
这里利用的是 & 的知识点
private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser
private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;
PowerOfTwoEventExecutorChooser(EventExecutor[] executors)
this.executors = executors;
@Override
//executors总数必须是2的幂次方(2,4,8...等)才会用,&运算效率更高,同时当idx累加成最大值之后,相比较通用的方式(GenericEventExecutorChooser),更公平
public EventExecutor next()
return executors[idx.getAndIncrement() & executors.length - 1];
读者如果阅读过 HashMap
的源码,在 put
的过程中,采用了 &
的方式去进行数组的确定。
public V put(K key, V value)
return putVal(hash(key), key, value, false, true);
final V putVal(int hash, K key, V value, boolean onlyIfAbsent, boolean evict)
Node<K,V>[] tab; Node<K,V> p; int n, i;
if ((tab = table) == null || (n = tab.length) == 0)
n = (tab = resize()).length;
if ((p = tab[i = (n - 1) & hash]) == null)
tab[i] = newNode(hash, key, value, null)
其中,tab[i = (n - 1) & hash]
利用 (n - 1) & hash
将当前的数组均匀的分配到每个数组上,其实效果和我们上面的取模是一样的。
主要是性能上的提高
- 对于取模运算来说,我们计算机需要去进行不断的取余计算,如果数据过大,其实会耗费一些性能。
- 但对于 & 运算,直接操作二进制,这种效率较高
这里就不介绍为什么 (n - 1) & hash
等价于 hash % n
了,有兴趣的读者可以翻一下以前介绍 HashMap
的文章
4. Netty 如何保证跨平台性
我们知道,对于不同的平台,Netty 具有不同的实现,如下:
那这种是如何实现的呢,毕竟我们使用的都是同一套代码
当我们创建 EventLoopGroup
时,我们会有这么一个实现:
public NioEventLoopGroup(int nThreads, Executor executor)
//默认selector,最终实现类似:https://github.com/frohoff/jdk8u-jdk/blob/master/src/macosx/classes/sun/nio/ch/DefaultSelectorProvider.java
//basic flow: 1 java.nio.channels.spi.SelectorProvider 2 META-INF/services 3 default
this(nThreads, executor, SelectorProvider.provider());
这里面的 SelectorProvider.provider()
就是我们所选择的实现
- 最终的实现是在我们下载的JDK里面,所以不同平台的JDK默认有不同的实现
public static SelectorProvider provider()
// 防止多个线程并发访问
synchronized (lock)
if (provider != null)
return provider;
return AccessController.doPrivileged(
new PrivilegedAction<SelectorProvider>()
public SelectorProvider run()
// 这里是通过SPI去查询文件是否配置
// String cn = System.getProperty("java.nio.channels.spi.SelectorProvider");
if (loadProviderFromProperty())
return provider;
// META-INF/services
// ServiceLoader<SelectorProvider> sl =
ServiceLoader.load(SelectorProvider.class,
ClassLoader.getSystemClassLoader());
if (loadProviderAsService())
return provider;
// 默认会用下面这个
provider = sun.nio.ch.DefaultSelectorProvider.create();
return provider;
);
// 类的路径:java.nio.channels.spi.SelectorProvider.DefaultSelectorProvider
public static SelectorProvider create()
return new WindowsSelectorProvider();
// 博主安装的是 Window 的JDK版本,如果是MAC的,下面的将会是这个样子
public static SelectorProvider create()
return new sun.nio.ch.KQueueSelectorProvider();
四、Netty 对粘包和半包的支持
1. 什么是粘包、半包
我们的客户端向服务端发送 ABC DED,但我们的服务端收到的消息却不一样,具体情况如下:
- 没有问题的情况下:服务端收到 ABC DEF
- 一次收到了:服务端收到 ABCDEF (TCP粘包)
- 多次收到:第一次 AB,第二次 CD,第三次 EF(TCP拆包)
我们这里实际测试一下,有的读者可能还是不太清楚:
首先,我们的服务端核心代码如下:
public class EchoServerHandler extends ChannelInboundHandlerAdapter
private AtomicInteger counter = new AtomicInteger(0);
/*** 服务端读取到网络数据后的处理*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
ByteBuf in = (ByteBuf)msg;
String request = in.toString(CharsetUtil.UTF_8);
System.out.println("Server Accept["+request
+"] and the counter is:"+counter.incrementAndGet());
String resp = "Hello,"+request+". Welcome to Netty World!"
+ System.getProperty("line.separator");
ctx.writeAndFlush(Unpooled.copiedBuffer(resp.getBytes()));
当服务端收到客户端的消息时,会执行 channelRead0
这个方法,我使用了一个 counter
原子变量来记录服务端当前收到的消息数量。
客户端核心代码:
/**
* 类说明:粘包/半包问题展示
*/
public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf>
private AtomicInteger counter = new AtomicInteger(0);
/*** 客户端读取到网络数据后的处理*/
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception
System.out.println("client Accept["+msg.toString(CharsetUtil.UTF_8)
+"] and the counter is:"+counter.incrementAndGet());
/*** 客户端被通知channel活跃后,做事*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception
ByteBuf msg = null;
String request = "ABC,DEF,GHI,JKL,MNO"
+ System.getProperty("line.separator");
//发送100次
for(int i=0;i<100;i++)
msg = Unpooled.buffer(request.length());
msg.writeBytes(request.getBytes());
ctx.writeAndFlush(msg);
我们可以看到,我们的客户端会发送 100
条 ABC,DEF,GHI,JKL,MNO
的字符串
正常来说,我们的服务端会收到 100
条消息,我们运行一下看看实际情况
服务端的截图:
- 收到的第一条消息:
- 收到的第二条消息
这正是出现的粘包、半包问题
2. 为什么会出现这个问题?
当我们的客户端向服务端发送数据时,并不是立即发送
主要在于:socket会有缓存区,当缓存区达到一定的条件时,才会进行发送至服务端。
当然,本质还是在与:对于TCP来说,TCP是一个流式的协议,消息无边界
3. 如何避免粘包、半包
既然出现问题的原因在于消息无边界,那么只要我们找到当前客户端发送消息的边界就可以了。
3.1 短连接(不推荐)
当我们的客户端和服务端建立连接后,客户端发送一次消息就立即断开,这样我们的消息仅仅只发一次,服务端也只会收到一次。
这种一般不太实用,总不能我客户端每一次都要和服务端重新建立连接吧。
3.2 固定长度(不推荐)
我们客户端和服务端发送消息规定一个长度,比如我现在想发送 ABCDEF
100 次,那么我规定的长度就是 6
,一次6个字符为一次调用。
Netty 对长度的支持为:FixedLengthFrameDecoder
,自动为你分割服务端当前收到的消息
但这种比较浪费空间,比如你当前的长度设置成 100
,但最后一次消息分割只有 10
,浪费掉了 90
的空间。
3.3 分隔符(推荐)
我们客户端和服务端发送消息规定一个换行符
,比如我现在想发送 ABCDEF
100 次,那么我们每一次发送后面都加一个 换行符
。
Netty 对换行符的支持为:LineBasedFrameDecoder
,自动为你分割服务端当前收到的消息。
当然,如果你不想局限于 换行符
,也可以使用自定义的:DelimiterBasedFrameDecoder
但这种的缺点在于有的分隔符需要进行转义,代码写起来较为复杂。
3.4 消息头和消息体(推荐)
参考我们的 HTTP 请求
分别以下几方面:
- 魔数:4 个字节的魔数
- 版本:当前的版本号
- 序列化方式:JDK或者JSON
- 指令类型:
- 请求序号:唯一性
- 对齐填充
- 获取内容的字节数组
- 正文长度
protected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception
// 1. 4 个字节的魔数
out.writeBytes(new byte[]1, 2, 3, 4);
// 2. 1 字节的版本
out.writeByte(1);
// 3. 1 字节的序列化方式:0 JDK 1 JSON
out.writeByte(0);
// 4. 1 指令类型
out.writeByte(msg.getMessageType(以上是关于Netty 从成神到升仙系列 三Netty 凭什么成为国内最流行的网络通信框架?的主要内容,如果未能解决你的问题,请参考以下文章
Spring从成神到升仙系列 一2023年再不会动态代理,就要被淘汰了
Spring从成神到升仙系列 四从源码分析 Spring 事务的来龙去脉
Kafka从成神到升仙系列 四你真的了解 Kafka 的缓存池机制嘛