Netty组件源码分析
Posted 疾风先生
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Netty组件源码分析相关的知识,希望对你有一定的参考价值。
深入理解Netty原理之前,先来了解下Netty的核心组件工作原理,通过一个简单的Netty服务程序示例我们可以了解到Netty的核心组件主要有启动类与事件轮询类,Channel组件,通过多线程编程异步返回事件结果的ChannelFuture,同时可以看到被回调执行的Handler执行链是如何工作的,最后分析Netty自定义实现的一个高性能数据缓冲对象原理使用.
Netty组件使用示例
讲述netty之前,先看下netty的一个整体结构如下:
netty一个简单示例
public class NettyServer {
private static final String IP = "127.0.0.1";
private static final int port = 8080;
private static final int BIZGROUPSIZE = Runtime.getRuntime().availableProcessors() * 2;
private static final int BIZTHREADSIZE = 100;
private static final EventLoopGroup bossGroup = new NioEventLoopGroup(BIZGROUPSIZE);
private static final EventLoopGroup workGroup = new NioEventLoopGroup(BIZTHREADSIZE);
public static void main(String[] args) throws Exception {
NettyServer.start();
}
public static void start() throws Exception {
ServerBootstrap serverBootstrap = initServerBootstrap();
ChannelFuture channelFuture = serverBootstrap.bind(IP, port).sync();
channelFuture.channel().closeFuture().sync();
}
private static ServerBootstrap initServerBootstrap() {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workGroup)
.channel(NioserverSocketChannel.class)
.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
pipeline.addLast(new TcpServerHandler());
}
});
return serverBootstrap;
}
}
public class TcpServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) {
System.out.println("get new client connection ");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 使用之后需要进行释放回到内存池中方便回收,以便于下次使用的时候可重复利用
// 如果不释放将会产生新的ByteBuf
((ByteBuf) msg).release();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
ctx.close();
}
}
-
启动类ServerBoostrap -
事件轮询类EventLoop以及EventLoopGroup -
Netty自身实现的Channel,如上述的NioServerSocketChannel,作为数据传输的载体 -
Netty自身实现异步操作的ChannelFuture -
事件(入站与出站事件)与ChannelHandler -
责任链ChannelPipeline与上下文ChannelHandlerContext -
ByteBuff组件
启动类与事件轮询简述
启动类ServerBootstrap分析
-
Bootstrap类图设计结构
// ServerBootstrap.java
@Override
@SuppressWarnings("CloneDoesntCallSuperClone")
public ServerBootstrap clone() {
// 创建一个新的Boostrap,但是新的Boostrap对象属性沿用先前的bootsrtap属性,属于对象属性浅拷贝
return new ServerBootstrap(this);
}
private ServerBootstrap(ServerBootstrap bootstrap) {
super(bootstrap);
childGroup = bootstrap.childGroup;
childHandler = bootstrap.childHandler;
synchronized (bootstrap.childOptions) {
childOptions.putAll(bootstrap.childOptions);
}
synchronized (bootstrap.childAttrs) {
childAttrs.putAll(bootstrap.childAttrs);
}
}
-
服务端Channel启动
// 1. 当启动调用`bind()`方法时将会创建一个ServerChannel并监听客户端连接
// 2. 接收到客户端连接时将会新创建一个Channel并监听Channel读写事件
EventLoop组件分析
-
EventLoop类图结构设计
可以看到Netty的EventLoop组件使用JDK的并发库以及基于网络编程库的自定义Channel增强来优化web服务性能,同时对于EventLoop的源码定义如下:
// EventLoop是具备有序性,是一个有序事件执行器
public interface EventLoop extends OrderedEventExecutor, EventLoopGroup {
@Override
EventLoopGroup parent();
}
// EventLoopGroup 是一个事件执行器组,在原有的基础上增强事件功能方法
public interface EventLoopGroup extends EventExecutorGroup{}
// 对于OrderedEventExecutor,通过查看开发工具查看父类最终也是一个事件执行器组,在原有的基础上增强功能,即EventExecutor
// OrderedEventExecutor -> EventExecutor -> EventExecutorGroup
// 事件执行器具备两个功能:一个是迭代器功能,一个是计划任务的线程池
interface EventExecutorGroup extends ScheduledExecutorService, Iterable<EventExecutor>{}
EventLoop工作原理
结合Reactor模式以及Netty的源码示例,我们从宏观上看Channel,EventLoop,EventLoop、Group以及Thread之间的关系如下:
EventLoop
以及
EventLoopGroup
的理解如下:
-
EventLoopGroup包含多个EventLoop,每个EventLoop的生命周期绑定一个线程,即可以将EventLoopGroup类比为客户端负载均衡服务的集群,通过选举策略选择其中一个可用的正常服务应用在特定的线程Thread中处理IO事件,也就是不论是注册还是事件IO完成响应都将会在专有的线程上进行处理. -
每个 Channel
进行注册操作都只能注册于一个EventLoop
,但是由于一个EventLoopGroup
存在多个EventLoop
,因而一个EventLoop
可以分配给一个或者多个Channel
进行IO相关的事件处理.
Channel组件分析
Netty自定义的Channel
-
对于用户而言具备以下三方面
-
提供了channel的当前状态,即连接是open还是connected状态 -
channel相关的属性配置ChannelConfig,比如channel接收的buffer大小配置 -
channel支持读取,写出,连接以及绑定的IO操作,与ChannelPipeline一同协作完成所有和Channel相关的IO事件操作,其中ChannelPipeline以责任链的方式连接所有IO事件与Channel相关的请求事件的完成处理器.
-
Channel是异步的
ChannelFuture
组件来通知异步操作的状态(成功/失败/取消).
-
Channel是划分层次的
SocketChannel
是通过服务端的
ServerSocketChannel
创建的,因此对于SocketChannel的上一个层级父类是
ServerSocketChannel
,也就是说通过
ScoektChannel
的
parent()
方法能够获取到
ServerSocketChannel
,这个是属于语义上的层次划分
-
支持向下转换以便于于满足特殊传输协议的一些特有的方法
DatagramChannel
来调用对应的join或者leave方法
-
释放资源
-
Channel的生命周期(摘录Netty实战)
ChannelUnregistered: Channel已将被创建,但是还未被注册到EventLoop组件上
ChannelRegistered: Channel已经被注册到EventLoop组件上
ChannelActive: Channel已经与服务端建立连接状态,处于活跃状态,可以进行发送和写出数据
ChannelInActive: Channel与服务端没有建立连接状态
ChannelFuture组件分析
Netty自定义的ChannelFuture
future.get()
,此时如果操作完成就会返回结果,如果未完成将会同步阻塞于线程等待完成,Netty为了解决这个繁琐的操作,自定义实现了ChannelFuture类.
// 与ChannelFuture相关的
interface ChannelFuture extends Future<Void>{}
// 特殊的ChannelFuture,表示具备可写特征,即客户端获取当前类之后是具备可更改属性操作的.
interface Promise<V> extends Future<V>{} // Future为java并发库下
interface ChannelPromise extends ChannelFuture, Promise<Void>{}
-
ChannelFuture使用示例
Channel channel = new NioSocketChannel();
// 以异步的方式与远程服务建立连接
ChannelFuture future = channel.connect(new InetSocketAddress("192.168.10.110", 8080));
future.addListener(new ChannelFutureListener(){
void operationComplete(ChannelFuture future) throws Exception{
if(future.future.isSuccess()){
// 建立连接成功
}else{
//建立连接失败
}
}
});
ChannelHandler组件分析
事件与ChannelHandler
Netty事件
-
Netty入站事件: 主要是入站数据或者是相关状态发生更改而触发的事件,就是socket有新连接或者新请求 -
已经建立连接或者连接失效/超时 -
数据读取 -
用户事件,即应用程序给予事件响应完成的处理程序 -
异常错误事件 -
Netty出站事件: 未来将会触发某个动作的结果,即程序主动向socket底层发起操作 -
打开或者关闭socket远程节点的连接 -
将数据写出或者刷新到socket缓冲区中
ChannelHandler
-
交由具体子接口定义出入站事件处理方法
// 1. 交由具体子接口定义出入站事件处理方法
// ChannelHandler并没有提供很多的方法声明,同时通过上述的入站和出站事件处理,我们也很容易想到ChannelHandler存在处理入站事件的ChannelInboundHandler以及出站事件的ChannelOutboundHandler,同时在netty中默认有对应实现方式
// ChannelInboundHandlerAdapter实现入站事件的处理
// ChannelOutboundHandlerAdapter实现出站事件的处理
// ChannelDuplexHandler能够同时处理入站和出站事件
-
上下文对象ChannelHandlerContext
// 2. 上下文对象,ChannelHandlerContext
// 要想在上述ChannelHandler的链式事件处理流程,就必须满足两个条件,一个是如何在每个单独ChannelHandler的处理器传递事件,二是每个ChannelHandler是如何通过链式绑定关联的
// ChannelHandler通过ChannelHandlerContext为每个对应的Handler传递事件,因此ChannelHandler必然存在一个上下文对象负责事件传递,类似于EDA的事件通道
// netty事件触发就会创建响应事件的ChannelHandler,并添加到ChannelPipeline中,通过链表的数据结构来维护每个Handler之间关联,同时将ChannelPipeline存储在上下文中,可以通过上下文对象获取管道对象
-
能够存储并管理有状态的信息
// 现定义一个需求:需要进行登录才能够获取数据,于是就有了以下的定义
interface Message{
// methods
}
class DataServerHandler extends SimpleChannelInboundHandler<Message> {
// 存储有状态数据
private boolean isLogined;
public void channelRead0(ChannelHandlerContext ctx, Message msg){
if(message instanceof LoginMessage){
auth((LoginMessage)message);
isLogined = true;
}else if(message instanceof GetDataMessage){
if(isLogined){
ctx.writeAndFlush(fetchSecret((GetDataMessage) message));
}else{
fail();
}
}
}
// 为了避免数据竞争产生数据不一致问题,避免上述需求中的非法登录用户获取到登录数据,必须为每个请求处理连接在提交给handler处理前保证handler是针对当前的连接是1:1的处理方式,即一个连接对应一个channel处理器,对此,需要在添加channel时通过以下的方式进行添加
serverBootstrap.childHandler(new ChannelInitializer<Channel>(){
public void initChannel(Channel channel){
channel.pipeline().addLast(new DataServerHandler());
// 如果一个连接需要用到多个handler协同处理,则只需要调用addLast添加即可
// 这样每次有事件发生的时候,对应的一个请求处理连接的事件响应处理都会重新创建handler,即保证每个请求连接的事件响应handler都是新创建的,避免了数据竞争产生的数据不一致问题
}
});
// 如果要保证handler只创建一次,那么就只需要进行调用childHandler添加对应实现的具体Handler实例
-
使用AttributeKey来存储信息
尽管对于存储有状态的信息需要新创建channel去处理它,但是并不是所有情况都是需要创建一个新的handler去处理不同的连接,比如对于通用的共享数据,不存在于不同连接的状态变化,但是为了能够保证共享数据是安全的,为此可以使用AttribuiteKey存储这类数据信息,同时在每个handler中都会有一个上下文对象,而当前的AttributeKey能够通过上下文对象获取到,因此对于AttributeKey的获取在不同handler中可以通过上下文对象来获取,并且为对应的handler添加注解@Sharable能够保证线程是安全的,比如下面例子.
interface Message{
// methods
}
@Sharable
class DataServerHandler extends SimpleChannelInboundHandler<Message> {
// 存储有状态数据
private boolean sharedObject;
public void channelRead0(ChannelHandlerContext ctx, Message msg){
// ...
}
}
serverBootstrap.childHandler(new ChannelInitializer<Channel>(){
private static final DataServerHandler SHARED = new DataServerHandler();
public void initChannel(Channel channel){
// 这样保证在处理不同连接的处理链pipeline中存储的handler都是相同的,并且是线程安全的
channel.pipeline().addLast(SHARED);
}
});
-
注解@Sharable
-
如果定义的Handler添加注解为@Sharable,则表示Handler在整个处理链pipeline中都是属于无竞争的环境,数据不存在线程安全问题,同时如果是无状态的数据可以通过只创建一次handler实例来完成整个事件处理链pipeline的handler处理. -
如果没有添加注解说明,那么为了保证每个handler的共享数据是属于线程安全的,就必须为每次请求连接的操作创建新的handler,也就是说不同的连接的事件处理链pipeline存储的handler都是连接特有的,属于连接与handler的1:1模式,这样才能保证数据是线程安全.
-
ChannelHandler的方法回调机制
ChannelDuplexHandler
,因此在实际使用中可以继承上述相关类来重写部分方法以实现目标业务逻辑的处理程序.
ChannelHanlder的生命周期
// handlerAdded: 当ChannelHandler添加到pipeline被调用
// handlerRemoved: 当ChannelHandler从pipeline中移除时被调用
// exceptionCaught: 在处理过程中ChannelPipeline发生异常时被调用
责任链设计
-
伪代码实现
/**
* 责任链设计:
* channel1 -> channel2 -> channel3 -> ...
*/
// channel1处理完之后的结果将会传递给channel2进行处理,然后将channel2的处理结果传递给channel3再进行处理,那么什么时候结束呢?
//要么遍历到没有下一个channel节点为止就结束
//要么就是直接在当前节点中断不进行往下传播事件
// 根据netty的channel可知,pipeline需要依赖到一个上下文对象,通过上下文对象来实现责任链的数据传输,于是就有以下的定义.
class Main{
public static void main(String[] args){
// 类比netty添加方式
HandlerPipeline pipeline = new HandlerPipeline();
pipeline.addLast(new HandlerTest());
pipeline.addLast(new HandlerTest());
}
}
// 基于链表结构存储handler
class HandlerPipeline {
private HandlerContext head = new HandlerContext(new Handler(){
void doHandler(HandlerContext context, Object val){
context.nextRun(val);
}
});
public void addLast(Handler handler){
HandlerContext ctx = head;
while(ctx.next != null){
ctx = ctx.next;
}
ctx.next = new HandlerContext(handler);
}
}
// 通过上下文保存当前handler并传递数据到下一个context进行处理
class HandlerContext {
HandlerContext next;
Handler handler;
HandlerContext(Handler handler){
this.handler = handler;
}
void nextRun(Object val){
if(this.next != null){
this.next.handler(val);
}
}
void handler(Object val){
this.handler.doHandler(this, val);
}
}
// handler处理器
interface Handler{
void doHandler(HandlerContext context, Object val);
}
class HandlerTest implements Handler{
public void doHandler(HandlerContext context, Object val){
// 传播给下一个handler
context.nextRun(val);
// 如果不传播,在当前handler停止,就无需调用nextRun方法
}
}
Netty责任链运作流程
责任链流程(摘录源码)
* +---------------------------------------------------+---------------+
* | ChannelPipeline | |
* | |/ |
* | +---------------------+ +-----------+----------+ |
* | | Inbound Handler N | | Outbound Handler 1 | |
* | +----------+----------+ +-----------+----------+ |
* | /| | |
* | | |/ |
* | +----------+----------+ +-----------+----------+ |
* | | Inbound Handler N-1 | | Outbound Handler 2 | |
* | +----------+----------+ +-----------+----------+ |
* | /| . |
* | . . |
* | ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()|
* | [ method call] [method call] |
* | . . |
* | . |/ |
* | +----------+----------+ +-----------+----------+ |
* | | Inbound Handler 2 | | Outbound Handler M-1 | |
* | +----------+----------+ +-----------+----------+ |
* | /| | |
* | | |/ |
* | +----------+----------+ +-----------+----------+ |
* | | Inbound Handler 1 | | Outbound Handler M | |
* | +----------+----------+ +-----------+----------+ |
* | /| | |
* +---------------+-----------------------------------+---------------+
* | |/
* +---------------+-----------------------------------+---------------+
* | | | |
* | [ Socket.read() ] [ Socket.write() ] |
* | |
* | Netty Internal I/O Threads (Transport Implementation) |
* +-------------------------------------------------------------------+
连接与事件处理链协作流程简要流程图
Netty之ChannelPipeline源码分析
// 1.pipeline的创建
// 每个Channel都有自己的pipeline,并且当一个新的channel被创建的时候会自动创建
// 2. 责任链流程 -- 见上述责任链流程
// 3. pipeline在上下文进行事件传播的方法
// 入站事件与出站事件
/* <ul>
* <li>Inbound event propagation methods:
* <ul>
* <li>{@link ChannelHandlerContext#fireChannelRegistered()}</li>
* <li>{@link ChannelHandlerContext#fireChannelActive()}</li>
* <li>{@link ChannelHandlerContext#fireChannelRead(Object)}</li>
* <li>{@link ChannelHandlerContext#fireChannelReadComplete()}</li>
* <li>{@link ChannelHandlerContext#fireExceptionCaught(Throwable)}</li>
* <li>{@link ChannelHandlerContext#fireUserEventTriggered(Object)}</li>
* <li>{@link ChannelHandlerContext#fireChannelWritabilityChanged()}</li>
* <li>{@link ChannelHandlerContext#fireChannelInactive()}</li>
* <li>{@link ChannelHandlerContext#fireChannelUnregistered()}</li>
* </ul>
* </li>
* <li>Outbound event propagation methods:
* <ul>
* <li>{@link ChannelHandlerContext#bind(SocketAddress, ChannelPromise)}</li>
* <li>{@link ChannelHandlerContext#connect(SocketAddress, SocketAddress, ChannelPromise)}</li>
* <li>{@link ChannelHandlerContext#write(Object, ChannelPromise)}</li>
* <li>{@link ChannelHandlerContext#flush()}</li>
* <li>{@link ChannelHandlerContext#read()}</li>
* <li>{@link ChannelHandlerContext#disconnect(ChannelPromise)}</li>
* <li>{@link ChannelHandlerContext#close(ChannelPromise)}</li>
* <li>{@link ChannelHandlerContext#deregister(ChannelPromise)}</li>
* </ul>
* </li>
* </ul>
*/
// 4. 构建一个ChannelPipeline
// 当我们的web服务为每个请求处理对应的decode-process-encode时,对于执行比较耗时的操作需要将线程隔离处理,也就是需要有针对Group对process进行处理,而其他线程仍然可以处理decode以及encode非耗时逻辑,可以通过以下的方式:
static final EventExecutorGroup group = new DefaultEventExecutorGroup();
pipeline = ch.pipeline();
pipeline.addLast("decoder", new MyProtocolDecoder());
pipeline.addLast("encoder", new MyProtocolEncoder());
// 这个时候的process处理会单独放在以下的group进行处理
pipeline.addLast(group, "handler", new MyBusinessLogicHandler());
// 5. ChannelPipeline是属于线程安全类
-
Netty之ChannelHandlerContext源码分析
// 1. 通过唤醒回调后在pipeline流程链中向不同的handler传递信息
// 2. 上下文存储的数据可以实现事件触发执行传递到不同的handler方法中,甚至可以是在不同线程中实现数据的共享,比如以下代码:
public class MyHandler extends ChannelDuplexHandler{
private ChannelHandlerContext ctx;
public void beforeAdd(ChannelHandlerContext ctx){
// 可以在添加到pipeline之前保存ctx信息
this.ctx = ctx;
}
public void login(String username, password) {
// 将保存的ctx存储登录信息并将登录信息传递到责任链pipeline下后续的handler获取使用
ctx.write(new LoginMessage(username, password));
}
}
// 3. 存储有状态的信息,详细可以查看上述的ChannelHandler使用
// 4. 一个handler可以拥有多个context信息,因为一个handler可以添加到一个或者多个pipeline中,而每个pipeline都会对应着一个context,因而一个handler是可以拥有一个或者多个context,比如计算handler被添加到pipeline的次数
public class FactorialHandler extends ChannelInboundHandlerAdapter{
private final AttributeKey<Integer> counter = AttributeKey.valueOf("counter");
public void channelRead(ChannelHandlerContext ctx, Object msg){
Integer a = ctx.attr(counter).get();
if (a == null) {
a = 1;
}
attr.set(a * (Integer)msg);
}
}
// 下面将会进行4次计数器的计算,也就是一个handler实例添加到不同或者相同的active的pipeline中,其上下文对象是不一样的
ChannelPipeline p1 = channel.pipeline();
p1.addLast("f1", fh);
p1.addLast("f2", fh);
ChannelPipeline p2 = channel.pipeline();
p1.addLast("f3", fh);
p1.addLast("f4", fh);
ByteBuf组件分析
ByteBuf组件
工作原理
readIndex==writeIndex
也就意味着读取数据达到数组的末尾,再次进行读取时会发生IndexOutOfBoundsException,而对于
writeIndex==Capacity
即ByteBuf的容量大小时也会发生下标越界异常,名称以read或者write将会推进对应的索引数值,而名称以set或者get的方法调用时将不会对读取或者写入索引进行递增操作.
字节级源码分析
-
随机访问: ByteBuf
类似于一个字节数组,于数组索引具备相同的特征,即下标从0开始,以capacity - 1
的下标为末尾索引,可以按照数组的方式对其通过下标随机访问,此时对于拥有readerIndex以及writeIndex值是不变,但可以通过调用readerIndex(index)
或者writeIndex(index)
来更改对应的索引值.
for(int i=0; i<buffer.capacity(); i++){
byte b = buffer.getByte(i);
logger.info("char s is " + (char)b);
}
-
顺序访问: 主要是通过 readerIndex
以及writeIndex
两个索引值指针实现顺序访问,对于ByteBuf
的顺序访问存在丢弃字节,可读字节以及可写字节的概念,对此摘录源码分析如下:
/*
* +-------------------+------------------+------------------+
* | discardable bytes | readable bytes | writable bytes |
* | | (CONTENT) | |
* +-------------------+------------------+------------------+
* | | | |
* 0 <= readerIndex <= writerIndex <= capacity
*.
*/
// 可读字节区域: 代表ByteBuf还未读取到ByteBuf的数据区域,在netty的ByteBuf中以read或者skip开头的读取数据的方法都会在指针readerIndex实现计数的自增加操作,对于可读指针取值范围: 0<=readerIndex<=writeIndex
// 可写字节区域: 代表从writeIndex - capacity之间的区域为空闲区域,能够继续存储数据来填充区域,如果满足writeIndex < capacity代表该ByteBuf拥有足够的区域进行写入数据.
// 丢弃的字节区域: 表示已经被读取过的ByteBuf片段区域,初始化状态的时候,区域大小为0,如果数据一直在被读取,那么对应的区域大小会增加到writeIndex,也就是对于该区域满足: 0<= size <= writeIndex.
// 调用discardReadBytes()方法的区域变化:
/*
* BEFORE discardReadBytes()
*
* +-------------------+------------------+------------------+
* | discardable bytes | readable bytes | writable bytes |
* +-------------------+------------------+------------------+
* | | | |
* 0 <= readerIndex <= writerIndex <= capacity
*
*
* AFTER discardReadBytes()
*
* +------------------+--------------------------------------+
* | readable bytes | writable bytes (got more space) |
* +------------------+--------------------------------------+
* | | |
* readerIndex (0) <= writerIndex (decreased) <= capacity
*/
-
清除方法 clear
// 关于清除方法直接通过查看区域的变化即可
/*
* <pre>
* BEFORE clear()
*
* +-------------------+------------------+------------------+
* | discardable bytes | readable bytes | writable bytes |
* +-------------------+------------------+------------------+
* | | | |
* 0 <= readerIndex <= writerIndex <= capacity
*
*
* AFTER clear()
*
* +---------------------------------------------------------+
* | writable bytes (got more space) |
* +---------------------------------------------------------+
* | |
* 0 = readerIndex = writerIndex <= capacity
* </pre>
*/
-
搜索操作
// 对于一些普通字节的查询,可以调用indexOf(int, int, byte)或者是bytesBefore(int, int, byte)完成,bytesBefore这个方法尤其对于以一些特殊字符结尾的字符串尤其有用.
// 对于更为复杂的操作,比较存在不同系统的特殊符号查询,可以通过ByteProcessor接口指定查询特定的符号内容,调用forEachByte(int, int, ByteProcessor)方法完成字节的搜索
-
Derived Buffers(派生缓冲区)
// 可以通过以下的方法创建一个新的ByteBuf缓冲区,每个派生出来的缓冲区都拥有自己的readerIndex,writeIndex以及标记索引,与NIO的buffer议案具备数据共享,因而当派生的缓冲区数据发生变化的时候,对应的源缓冲区也会发生变化.
// 如果要让数据不具备共享,可以通过使用copy()方法来实现一个新的数据副本,这个时候与派生的数据缓冲区区分在于copy()拥有独立的数据副本信息,可以通过以下的图示来分析,假设现在申请的一块源bytebuf是使用堆外内存存储数据的方式(堆内内存也是同理),这个时候派生的缓冲与copy的缓冲内存区域分布如下:
-
Netty的ByteBuf转换为JDK存在的类型
// 1. 转换为byte[]
if(byteBuff.hasArray()){
byte[] byteArr = byteBuff.array();
}
// 2. 转换为NIO的Buffers
if(byteBuff.nioBufferCount() > 0){
ByteBuffer nioByteBuffer = byteBuff.nioBuffer();
}
// 3. 转换为String
String str = byteBuff.toString(Charset.forName("utf8"));
// 4. 转换为IO的字节流
ByteBufInputStream in = new ByteBufInputStream(byteBuff);
ByteBufOutputStream out = new ByteBufOutputStream(byteBuff);
ByteBuff使用模式
-
使用堆缓冲区
-
使用堆外缓冲区
// 现在有一数组数据
byte[] arr = [1,2,3,4,5];
// 这个时候要发送数据出去,可考虑使用堆外内存数据缓冲,避免数据缓冲多一次内存复制,将数据发送到网卡中
// 接收网卡数据的时候,如果这个时候知道接收的数据为数组且需要堆数组数据进行遍历,那么这个时候直接使用堆外内存存储,还需要再进行一次数据内容的复制,而如果使用堆内内存存储,直接可以通过直接获取到数据转换为数组进行操作
-
复合缓冲区(零拷贝机制)
ByteBuff核心类图
-
存储方面: 使用堆内与堆外内存存储 -
资源利用技术: 使用非池化与池化技术,池化技术与内存管理将在讲netty高性能会详细说明 -
直接通过底层操作内存: Unsafe操作
使用ByteBufHolder接口
在一个web服务程序中,如果能够将一个http请求(请求头/请求体/状态码/cookie等信息)都封装一起以包的形式进行接收或者发送,那么对程序开发者而言将会带来很多的便利,对此,在netty框架提供了ByteBufHolder接口来存储ByteBuf之外的属性信息,同样提供了缓冲池化,底层访问数据以及引用计数的方法.对于网络编程中实现自定义的消息协议,可以采用ByteBufHolder接口来有效承载消息的信息.
ByteBuf内存分配策略
-
按需分配:ByteBufAllocator接口
// 默认创建池化且为堆外内存存储的ByteBuf,如果当前环境支持Unsafe底层操作,那么默认就会使用Unsafe+堆外存储+池化技术方式来创建ByteBuf
// 如果显示使用PreferHeapByteBufAllocator方式进行分配,则会创建堆内数据存储的ByteBuf
-
Unpooled分配ByteBuf工具类
ByteBufUtil操作ByteBuf工具类
// 核心方法
// 1. hexDump: 以16进制形式打印ByteBuf内容
// ByteBuf内容为: [0, 0, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]
// 16进制打印出来的数据为: 00 00 00 01 02 03 04 05 06 07 08 09 0a 0b
// (空格是手动加的,实际为: 0000000102030405060708090a0b)
// 2. 比较两个ByteBuf的存储的数据是否一致(内存 + 数据大小)
感谢花时间阅读,如果有用欢迎转发或者点个好看,谢谢~
老铁们关注走一走,不迷路
以上是关于Netty组件源码分析的主要内容,如果未能解决你的问题,请参考以下文章
Netty-源码分析ByteBuf-slice和retainedSlice使用细节