netty channelhandleradapter 是否去了channelread方法
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了netty channelhandleradapter 是否去了channelread方法相关的知识,希望对你有一定的参考价值。
参考技术A if (如果是登录报文)//处理登录信息
else // 其它报文
// 这句话就是调用下一个channelRead
ctx.fireChannelRead(msg);
本回答被提问者采纳
netty笔记
【1】Reactor模式
【2】NIO
【3】Netty组件
【4】Netty 线程模型
【5】异步处理
【6】ChannelOption
【7】常见Handler
【8】使用SSL/TLS创建安全的Netty程序
【9】Netty中的ByteBuf
【10】Netty中TCP拆包、粘包问题
【11】Pipeline
【12】为什么Netty会发生内存泄漏问题
【13】Netty SSL性能调优
【14】netty编解码器
【15】自定义编解码器
===========================================================================================================================================================
【1】Reactor模式
在处理web请求时通常有两种体系结构,分别为:thread-based architecture(基于线程)、event-driven architecture(事件驱动)。
基于线程的体系结构通常会使用多线程来处理客户端的请求,每当接收到一个请求,便开启一个独立的线程来处理。
事件驱动体系结构会定义一系列的事件处理器来响应事件的发生,并且将服务端接受连接与对事件的处理分离。其中,事件是一种状态的改变。比如,tcp中socket的new incoming connection、ready for read、ready for write。
reactor设计模式是event-driven architecture的一种实现方式,处理多个客户端并发的向服务端请求服务的场景。每种服务在服务端可能由多个方法组成。reactor会解耦并发请求的服务并分发给对应的事件处理器来处理。目前,许多流行的开源框架都用到了reactor模式,如:netty、node.js等,包括java的nio。
reactor主要由以下几个角色构成:handle、Synchronous Event Demultiplexer、Initiation Dispatcher、Event Handler、Concrete Event Handler
取决于 Reactor 的数量和 Hanndler 线程数量的不同,Reactor 模型有 3 个变种:
单 Reactor 单线程。
单 Reactor 多线程。
主从 Reactor 多线程。
【2】NIO
Java NIO 的核心组成部分:Channel、Buffer、Selector。其它组件:Pipe、FileLock 等。
Channel
Channel用于在字节缓冲区和位于通道另一侧的实体(通常是一个文件或套接字)之间有效地传输数据。
通道类似流,但又有些不同:既可以从通道中读取数据,又可以写数据到通道。但流的读写通常是单向的。通道可以异步地读写。通道中的数据总是要先读到一个 Buffer,或者总是要从一个 Buffer 中写入。
java.nio.channels包中的Channel接口的已知实现类:
FileChannel:从文件中读写数据。FileChannel对象是线程安全的、总是阻塞式的。
DatagramChannel:能通过UDP读写网络中的数据。
SocketChannel:能通过TCP读写网络中的数据。
ServerSocketChannel:可以监听新进来的TCP连接,像Web服务器那样。对每一个新进来的连接都会创建一个SocketChannel。
Buffer
缓冲区本质上是一块可以写入数据,然后可以从中读取数据的内存。这块内存被包装成NIO Buffer对象,并提供了一组方法,用来方便的访问该块内存。
使用Buffer读写数据一般遵循以下四个步骤:写入数据到Buffer、调用flip()方法、从Buffer中读取数据、调用clear()方法或者compact()方法。
当向buffer写入数据时,buffer会记录下写了多少数据。一旦要读取数据,需要通过flip()方法将Buffer从写模式切换到读模式。在读模式下,可以读取之前写入到buffer的所有数据。一旦读完了所有的数据,就需要清空缓冲区,让它可以再次被写入。有两种方式能清空缓冲区:调用clear()或compact()方法。clear()方法会清空整个缓冲区。compact()方法只会清除已经读过的数据。任何未读的数据都被移到缓冲区的起始处,新写入的数据将放到缓冲区未读数据的后面。
+Buffer的三个属性:
capacity,作为一个内存块,Buffer有一个固定的大小值,也叫“capacity”.你只能往里写capacity个byte、long,char等类型。一旦Buffer满了,需要将其清空(通过读数据或者清除数据)才能继续写数据往里写数据。
position,当你写数据到Buffer中时,position表示当前的位置。初始的position值为0.当一个byte、long等数据写到Buffer后, position会向前移动到下一个可插入数据的Buffer单元。position最大可为capacity – 1;当读取数据时,也是从某个特定位置读。当将Buffer从写模式切换到读模式,position会被重置为0. 当从Buffer的position处读取数据时,position向前移动到下一个可读的位置。
limit,在写模式下,Buffer的limit表示你最多能往Buffer里写多少数据。 写模式下,limit等于Buffer的capacity;当切换Buffer到读模式时, limit表示你最多能读到多少数据。因此,当切换Buffer到读模式时,limit会被设置成写模式下的position值。换句话说,你能读到之前写入的所有数据(limit被设置成已写数据的数量,这个值在写模式下就是position)。
Java NIO 有以下Buffer类型
ByteBuffer
MappedByteBuffer
CharBuffer
DoubleBuffer
FloatBuffer
IntBuffer
LongBuffer
ShortBuffer
使用示例:
ByteBuffer buf = ByteBuffer.allocate(48); //分配48字节capacity的ByteBuffer
int bytesRead = inChannel.read(buf); //read into buffer.从Channel写到Buffer
buf.put(127); //通过Buffer的put()方法写到Buffer里
buf.flip(); //flip方法将Buffer从写模式切换到读模式。调用flip()方法会将position设回0,并将limit设置成之前position的值
int bytesWritten = inChannel.write(buf); //从Buffer读取数据到Channel
byte aByte = buf.get(); //使用get()方法从Buffer中读取数据
Buffer.rewind() //将position设回0,所以你可以重读Buffer中的所有数据。limit保持不变,仍然表示能从Buffer中读取多少个元素(byte、char等)。
Buffer.mark() //可以标记Buffer中的一个特定position。之后可以通过调用Buffer.reset()方法恢复到这个position.
Selector
Selector的创建,通过调用Selector.open()方法创建一个Selector,如下:
Selector selector = Selector.open();
向Selector注册通道,为了将Channel和Selector配合使用,必须将channel注册到selector上。通过SelectableChannel.register()方法来实现,如下:
channel.configureBlocking(false);
selectionKey key = channel.register(selector,Selectionkey.OP_READ);
与Selector一起使用时,Channel必须处于非阻塞模式下。这意味着不能将FileChannel与Selector一起使用,因为FileChannel不能切换到非阻塞模式。而套接字通道都可以。注意register()方法的第二个参数。这是一个“interest集合”,意思是在通过Selector监听Channel时对什么事件感兴趣。可以监听四种不同类型的事件:Connect/Accept/Read/Write
通过Selector选择通道,一旦向Selector注册了一或多个通道,就可以调用几个重载的select()方法。这些方法返回你所感兴趣的事件(如连接、接受、读或写)已经准备就绪的那些通道。
select()方法返回的int值表示有多少通道已经就绪。亦即,自上次调用select()方法后有多少通道变成就绪状态。如果调用select()方法,因为有一个通道变成就绪状态,返回了1,若再次调用select()方法,如果另一个通道就绪了,它会再次返回1。一旦调用了select()方法,并且返回值表明有一个或更多个通道就绪了,然后可以通过调用selector的selectedKeys()方法,访问“已选择键集(selected key set)”中的就绪通道。
使用示例:
Selector selector = Selector.open();
channel.configureBlocking(false);
SelectionKey key = channel.register(selector, SelectionKey.OP_READ);
while(true) {
int readyChannels = selector.select();
if(readyChannels == 0) continue;
Set selectedKeys = selector.selectedKeys();
Iterator keyIterator = selectedKeys.iterator();
while(keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
if(key.isAcceptable()) {
// a connection was accepted by a ServerSocketChannel.
} else if (key.isConnectable()) {
// a connection was established with a remote server.
} else if (key.isReadable()) {
// a channel is ready for reading
} else if (key.isWritable()) {
// a channel is ready for writing
}
keyIterator.remove();
}
}
SelectionKey,这个对象包含了一些你感兴趣的属性:interest集合/ready集合/Channel/Selector/附加的对象(可选)
int interestSet = selectionKey.interestOps();
boolean isInterestedInAccept = (interestSet & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT;
int readySet = selectionKey.readyOps();
selectionKey.isAcceptable();
Channel channel = selectionKey.channel();
Selector selector = selectionKey.selector();
selectionKey.attach(theObject);
Object attachedObj = selectionKey.attachment();
SelectionKey key = channel.register(selector, SelectionKey.OP_READ, theObject);
【3】Netty组件
+ 核心组件:Channel、回调、Future、事件和ChannelHandler。
Channel:基本的I/O 操作(bind()、connect()、read()和write())依赖于底层网络传输所提供的原语。 EmbeddedChannel、LocalServerChannel、NioDatagramChannel、NioSctpChannel、NioSocketChannel。
EventLoop:定义了Netty 的核心抽象,用于处理连接的生命周期中所发生的事件。
一个EventLoopGroup 包含一个或者多个EventLoop;
一个EventLoop 在它的生命周期内只和一个Thread 绑定;
所有由EventLoop 处理的I/O 事件都将在它专有的Thread 上被处理;
一个Channel 在它的生命周期内只注册于一个EventLoop;
一个EventLoop 可能会被分配给一个或多个Channel。
+ 常用类:
ChannelHandler -> ChannelInboundHandler -> ChannelInboundHandlerAdapter :
一个ChannelHandler可以属于多个ChannelPipeline,它也可以绑定多个ChannelHandlerContext实例,如果一个ChannelHandler想要有这样的功能,就必须以 注解注释这个ChannelHandler,否则,尝试将其加入到不止一个ChannelPipeline中去的时候,会报出异常.
channelRead()—对于每个传入的消息都要调用;
channelReadComplete()—通知ChannelInboundHandler 最后一次对channelRead()的调用是当前批量读取中的最后一条消息;
exceptionCaught()—在读取操作期间,有异常抛出时会调用
ChannelHandler -> ChannelInboundHandler -> ChannelInboundHandlerAdapter -> SimpleChannelInboundHandler :
channelActive()—在到服务器的连接已经建立之后将被调用;
channelRead0()—当从服务器接收到一条消息时被调用;
exceptionCaught()—在处理过程中引发异常时被调用。
ChannelHandler -> ChannelInboundHandler -> ChannelInboundHandlerAdapter -> ChannelInitializer<C extends Channel>
+ 示例:
##服务端
final EchoServerHandler serverHandler = new EchoServerHandler();
EventLoopGroup group = new NioEventLoopGroup();
try{
ServerBootstrap b = new ServerBootstrap();
b.group(group).channel(NioServerSocketChannel.class).localAddress(new InetSocketAddress(port)).childHandler(new ChannelInitializer<SocketChannel>(){
public void initChannel(SocketChannel ch)throws Exception {
ch.pipeline().addLast(serverHandler);
}
});
ChannelFuture f = b.bind().sync();
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully().sync();
}
##客户端
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class).remoteAddress(new InetSocketAddress(host, port)).handler(new ChannelInitializer<SocketChannel>() {
public void initChannel(SocketChannel ch)throws Exception {
ch.pipeline().addLast(new EchoClientHandler());
}
});
ChannelFuture f = b.connect().sync();
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully().sync();
}
【4】Netty 线程模型
Netty 主要基于主从 Reactors 多线程模型,做了一定的修改,其中主从 Reactor 多线程模型有多个 Reactor:
MainReactor 负责客户端的连接请求,并将请求转交给 SubReactor。
SubReactor 负责相应通道的 IO 读写请求。
非 IO 请求(具体逻辑处理)的任务则会直接写入队列,等待 worker threads 进行处理。
Reactor 多线程模型的图
虽然 Netty 的线程模型基于主从 Reactor 多线程,借用了 MainReactor 和 SubReactor 的结构。但是实际实现上 SubReactor 和 Worker 线程在同一个线程池中:
EventLoopGroup bossGroup = newNioEventLoopGroup();
EventLoopGroup workerGroup = newNioEventLoopGroup();
ServerBootstrap server= newServerBootstrap();
server.group(bossGroup, workerGroup).channel(NioServerSocketChannel. class);
上面代码中的 bossGroup 和 workerGroup 是 Bootstrap 构造方法中传入的两个对象,这两个 group 均是线程池:
bossGroup 线程池则只是在 Bind 某个端口后,获得其中一个线程作为 MainReactor,专门处理端口的 Accept 事件,每个端口对应一个 Boss 线程。
workerGroup 线程池会被各个 SubReactor 和 Worker 线程充分利用。
【5】异步处理
异步的概念和同步相对。当一个异步过程调用发出后,调用者不能立刻得到结果。实际处理这个调用的部件在完成后,通过状态、通知和回调来通知调用者。
Netty 中的 I/O 操作是异步的,包括 Bind、Write、Connect 等操作会简单的返回一个 ChannelFuture。
调用者并不能立刻获得结果,而是通过 Future-Listener 机制,用户可以方便的主动获取或者通过通知机制获得 IO 操作结果。
当 Future 对象刚刚创建时,处于非完成状态,调用者可以通过返回的 ChannelFuture 来获取操作执行的状态,注册监听函数来执行完成后的操作。
常见有如下操作:
通过 isDone 方法来判断当前操作是否完成。
通过 isSuccess 方法来判断已完成的当前操作是否成功。
通过 isCancelled 方法来判断已完成的当前操作是否被取消。
通过 addListener 方法来注册监听器,当操作已完成(isDone 方法返回完成),将会通知指定的监听器;如果 Future 对象已完成,则理解通知指定的监听器。
例如下面的代码中绑定端口是异步操作,当绑定操作处理完,将会调用相应的监听器处理逻辑。
serverBootstrap.bind(port).addListener(future -> {
if(future.isSuccess()) {
System.out. println( newDate() + ": 端口["+ port + "]绑定成功!");
} else{
System.err. println( "端口["+ port + "]绑定失败!");
}
});
【6】ChannelOption
ChannelOption.SO_BACKLOG, 1024
BACKLOG用于构造服务端套接字ServerSocket对象,标识当服务器请求处理线程全满时,用于临时存放已完成三次握手的请求的队列的最大长度。如果未设置或所设置的值小于1,Java将使用默认值50。
ChannelOption.SO_KEEPALIVE, true
是否启用心跳保活机制。在双方TCP套接字建立连接后(即都进入ESTABLISHED状态)并且在两个小时左右上层没有任何数据传输的情况下,这套机制才会被激活。
ChannelOption.TCP_NODELAY, true
在TCP/IP协议中,无论发送多少数据,总是要在数据前面加上协议头,同时,对方接收到数据,也需要发送ACK表示确认。为了尽可能的利用网络带宽,TCP总是希望尽可能的发送足够大的数据。这里就涉及到一个名为Nagle的算法,该算法的目的就是为了尽可能发送大块数据,避免网络中充斥着许多小数据块。
TCP_NODELAY就是用于启用或关于Nagle算法。如果要求高实时性,有数据发送时就马上发送,就将该选项设置为true关闭Nagle算法;如果要减少发送次数减少网络交互,就设置为false等累积一定大小后再发送。默认为false。
ChannelOption.SO_REUSEADDR, true
SO_REUSEADDR允许启动一个监听服务器并捆绑其众所周知端口,即使以前建立的将此端口用做他们的本地端口的连接仍存在。这通常是重启监听服务器时出现,若不设置此选项,则bind时将出错。
SO_REUSEADDR允许在同一端口上启动同一服务器的多个实例,只要每个实例捆绑一个不同的本地IP地址即可。对于TCP,我们根本不可能启动捆绑相同IP地址和相同端口号的多个服务器。
SO_REUSEADDR允许单个进程捆绑同一端口到多个套接口上,只要每个捆绑指定不同的本地IP地址即可。这一般不用于TCP服务器。
SO_REUSEADDR允许完全重复的捆绑:当一个IP地址和端口绑定到某个套接口上时,还允许此IP地址和端口捆绑到另一个套接口上。一般来说,这个特性仅在支持多播的系统上才有,而且只对UDP套接口而言(TCP不支持多播)
ChannelOption.SO_RCVBUF AND ChannelOption.SO_SNDBUF
定义接收或者传输的系统缓冲区buf的大小,
ChannelOption.ALLOCATOR
Netty4使用对象池,重用缓冲区
bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
bootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
【7】常见Handler
编码解码HTTP协议消息的Handler:
HttpServerCodec -> CombinedChannelDuplexHandler -> ChannelDuplexHandler ...
HttpRequestDecode 将ByteBuf解码成HttpRequest和HttpContent
HttpResponseEncoder 将HttpResponse或HttpContent编码成ByteBuf
HttpClientCodec
处理HTTP时可能接收HTTP消息片段,Netty需要缓冲直到接收完整个消息。要完成的处理HTTP消息,并且内存开销也不会很大,Netty为此提供了HttpObjectAggregator。通过HttpObjectAggregator,Netty可以聚合HTTP消息,使用FullHttpResponse和FullHttpRequest到ChannelPipeline中的下一个ChannelHandler,这就消除了断裂消息,保证了消息的完整。
HttpObjectAggregator
使用HTTP时建议压缩数据以减少传输流量,压缩数据会增加CPU负载,现在的硬件设施都很强大,大多数时候压缩数据时一个好主意。Netty支持“gzip”和“deflate”,为此提供了两个ChannelHandler实现分别用于压缩和解压。
HttpContentDecompressor
使用HTTPS
SslHandler
使用WebSocket
BinaryWebSocketFrame 包含二进制数据
TextWebSocketFrame 包含文本数据
ContinuationWebSocketFrame 包含二进制数据或文本数据,BinaryWebSocketFrame和TextWebSocketFrame的结合体
CloseWebSocketFrame WebSocketFrame代表一个关闭请求,包含关闭状态码和短语
PingWebSocketFrame WebSocketFrame要求PongWebSocketFrame发送数据
PongWebSocketFrame WebSocketFrame要求PingWebSocketFrame响应
处理空闲连接和超时
IdleStateHandler 当一个通道没有进行读写或运行了一段时间后出发IdleStateEvent
ReadTimeoutHandler 在指定时间内没有接收到任何数据将抛出ReadTimeoutException
WriteTimeoutHandler 在指定时间内有写入数据将抛出WriteTimeoutException
解码分隔符和基于长度的协议
DelimiterBasedFrameDecoder 解码器,接收ByteBuf由一个或多个分隔符拆分,如NUL或换行符
LineBasedFrameDecoder 解码器,接收ByteBuf以分割线结束,如"\n"和"\r\n"
FixedLengthFrameDecoder
LengthFieldBasedFrameDecoder
打印日志
LoggingHandler
【8】使用SSL/TLS创建安全的Netty程序
//服务端
KeyManagerFactory keyManagerFactory = null;
KeyStore keyStore = KeyStore.getInstance("JKS");
keyStore.load(new FileInputStream("D:\\security\\server\\sChat.jks"), "sNetty".toCharArray());
keyManagerFactory = KeyManagerFactory.getInstance("SunX509");
keyManagerFactory.init(keyStore,"sNetty".toCharArray());
SslContext sslContext = SslContextBuilder.forServer(keyManagerFactory).build();
b.childHandler(new SslChannelInitializer(sslContext));
public class SslChannelInitializer extends ChannelInitializer<Channel> {
private final SslContext context;
public SslChannelInitializer(SslContext context) {
this.context = context;
}
protected void initChannel(Channel ch) throws Exception {
SSLEngine engine = context.newEngine(ch.alloc());
engine.setUseClientMode(false);
ch.pipeline().addFirst("ssl", new SslHandler(engine));
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
pipeline.addLast("frameEncoder", new LengthFieldPrepender(4)); //最大16M
pipeline.addLast("decoder", new StringDecoder(Charset.forName("UTF-8")));
pipeline.addLast("encoder", new StringEncoder(Charset.forName("UTF-8")));
pipeline.addLast("spiderServerBusiHandler", new SpiderServerBusiHandler());
}
}
//客户端
//SslContext sslCtx = SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).build(); //InsecureTrustManagerFactory.INSTANCE 不安全的信任管理器,信任所有证书
//p.addLast(NettyConstant.SSL, sslCtx.newHandler(sc.alloc(), uri.getHost(), uri.getPort()));
需要注意一点,SslHandler必须要添加到ChannelPipeline的第一个位置,可能有一些例外,但是最好这样来做.
public class SslChannelInitializer extends ChannelInitializer<Channel> {
private final SSLContext context;
private final boolean client;
private final boolean startTls;
public SslChannelInitializer(SSLContext context, boolean client, boolean startTls) {
this.context = context;
this.client = client;
this.startTls = startTls;
}
protected void initChannel(Channel ch) throws Exception {
SSLEngine engine = context.createSSLEngine();
engine.setUseClientMode(client);
ch.pipeline().addFirst("ssl", new SslHandler(engine, startTls));//startTls 如果为true,第一个写入的消息将不会被加密(客户端应该设置为true)
}
}
【9】Netty中的ByteBuf
ByteBuf 与JDK中的 ByteBuffer 的最大区别就是:
(1)netty的ByteBuf采用了读/写索引分离,一个初始化的ByteBuf的readerIndex和writerIndex都处于0位置。
(2)当读索引和写索引处于同一位置时,如果我们继续读取,就会抛出异常IndexOutOfBoundsException。
(3)对于ByteBuf的任何读写操作都会分别单独的维护读索引和写索引。maxCapacity最大容量默认的限制就是Integer.MAX_VALUE。
ByteBuf 的使用模式
JDK中的Buffer的类型 有heapBuffer和directBuffer两种类型,但是在netty中除了heap和direct类型外,还有composite Buffer(复合缓冲区类型)。
创建ByteBuf的方法
(1)通过ByteBufAllocator这个接口来创建ByteBuf,这个接口可以创建上面的三种Buffer,一般都是通过channel的alloc()接口获取。
(2)通过Unpooled类里面的静态方法,创建Buffer
CompositeByteBuf compBuf = Unpooled.compositeBuffer();
ByteBuf heapBuf = Unpooled.buffer(8);
ByteBuf directBuf = Unpooled.directBuffer(16);
// 在堆上分配一个ByteBuf,并指定初始容量和最大容量
public static ByteBuf buffer(int initialCapacity, int maxCapacity) {
return ALLOC.heapBuffer(initialCapacity, maxCapacity);
}
// 在堆外分配一个ByteBuf,并指定初始容量和最大容量
public static ByteBuf directBuffer(int initialCapacity, int maxCapacity) {
return ALLOC.directBuffer(initialCapacity, maxCapacity);
}
// 使用包装的方式,将一个byte[]包装成一个ByteBuf后返回
public static ByteBuf wrappedBuffer(byte[] array) {
if (array.length == 0) {
return EMPTY_BUFFER;
}
return new UnpooledHeapByteBuf(ALLOC, array, array.length);
}
// 返回一个组合ByteBuf,并指定组合的个数
public static CompositeByteBuf compositeBuffer(int maxNumComponents){
return new CompositeByteBuf(ALLOC, false, maxNumComponents);
}
使用示例
private static void simpleUse(){
// 1.创建一个非池化的ByteBuf,大小为10个字节
ByteBuf buf = Unpooled.buffer(10);
System.out.println("原始ByteBuf为====================>"+buf.toString());
System.out.println("1.ByteBuf中的内容为===============>"+Arrays.toString(buf.array())+"\n");
// 2.写入一段内容
byte[] bytes = {1,2,3,4,5};
buf.writeBytes(bytes);
System.out.println("写入的bytes为====================>"+Arrays.toString(bytes));
System.out.println("写入一段内容后ByteBuf为===========>"+buf.toString());
System.out.println("2.ByteBuf中的内容为===============>"+Arrays.toString(buf.array())+"\n");
// 3.读取一段内容
byte b1 = buf.readByte();
byte b2 = buf.readByte();
System.out.println("读取的bytes为====================>"+Arrays.toString(new byte[]{b1,b2}));
System.out.println("读取一段内容后ByteBuf为===========>"+buf.toString());
System.out.println("3.ByteBuf中的内容为===============>"+Arrays.toString(buf.array())+"\n");
// 4.将读取的内容丢弃
buf.discardReadBytes();
System.out.println("将读取的内容丢弃后ByteBuf为========>"+buf.toString());
System.out.println("4.ByteBuf中的内容为===============>"+Arrays.toString(buf.array())+"\n");
// 5.清空读写指针
buf.clear();
System.out.println("将读写指针清空后ByteBuf为==========>"+buf.toString());
System.out.println("5.ByteBuf中的内容为===============>"+Arrays.toString(buf.array())+"\n");
// 6.再次写入一段内容,比第一段内容少
byte[] bytes2 = {1,2,3};
buf.writeBytes(bytes2);
System.out.println("写入的bytes为====================>"+Arrays.toString(bytes2));
System.out.println("写入一段内容后ByteBuf为===========>"+buf.toString());
System.out.println("6.ByteBuf中的内容为===============>"+Arrays.toString(buf.array())+"\n");
// 7.将ByteBuf清零
buf.setZero(0,buf.capacity());
System.out.println("将内容清零后ByteBuf为==============>"+buf.toString());
System.out.println("7.ByteBuf中的内容为================>"+Arrays.toString(buf.array())+"\n");
// 8.再次写入一段超过容量的内容
byte[] bytes3 = {1,2,3,4,5,6,7,8,9,10,11};
buf.writeBytes(bytes3);
System.out.println("写入的bytes为====================>"+Arrays.toString(bytes3));
System.out.println("写入一段内容后ByteBuf为===========>"+buf.toString());
System.out.println("8.ByteBuf中的内容为===============>"+Arrays.toString(buf.array())+"\n");
}
执行结果:
原始ByteBuf为====================>UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 0, cap: 10)
1.ByteBuf中的内容为===============>[0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
写入的bytes为====================>[1, 2, 3, 4, 5]
写入一段内容后ByteBuf为===========>UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 5, cap: 10)
2.ByteBuf中的内容为===============>[1, 2, 3, 4, 5, 0, 0, 0, 0, 0]
读取的bytes为====================>[1, 2]
读取一段内容后ByteBuf为===========>UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 2, widx: 5, cap: 10)
3.ByteBuf中的内容为===============>[1, 2, 3, 4, 5, 0, 0, 0, 0, 0]
将读取的内容丢弃后ByteBuf为========>UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 3, cap: 10)
4.ByteBuf中的内容为===============>[3, 4, 5, 4, 5, 0, 0, 0, 0, 0]
将读写指针清空后ByteBuf为==========>UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 0, cap: 10)
5.ByteBuf中的内容为===============>[3, 4, 5, 4, 5, 0, 0, 0, 0, 0]
写入的bytes为====================>[1, 2, 3]
写入一段内容后ByteBuf为===========>UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 3, cap: 10)
6.ByteBuf中的内容为===============>[1, 2, 3, 4, 5, 0, 0, 0, 0, 0]
将内容清零后ByteBuf为==============>UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 3, cap: 10)
7.ByteBuf中的内容为================>[0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
写入的bytes为====================>[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]
写入一段内容后ByteBuf为===========>UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 14, cap: 64)
8.ByteBuf中的内容为===============>[0, 0, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
写操作
ByteBuf中定义了两类方法可以往ByteBuf中写入内容:writeXX() 和 setXX()。setXX是替换指定位置的值,而writeXX是想当前写指针写入数据后递增指针。
读操作
跟写操作一样,ByteBuf的读操作也有两种方法,分别是getXX()和readXX()。注意:get、set之类方法不会改变读写索引的位置。
ByteBuf buf = (ByteBuf)msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req,"UTF-8");
Direct Buffer( 直接缓冲区)
直接缓冲区,在堆之外直接分配内存。与堆buf区别:
劣势:创建和释放Direct Buffer的代价比Heap Buffer得要高;
优势:当我们把一个Direct Buffer写入Channel的时候,就好比是“内核缓冲区”的内容直接写入了Channel,这样显然快了,减少了数据拷贝(因为我们平时的read/write都是需要在I/O设备与应用程序空间之间的“内核缓冲区”中转一下的)。而当我们把一个Heap Buffer写入Channel的时候,实际上底层实现会先构建一个临时的Direct Buffer,
然后把Heap Buffer的内容复制到这个临时的Direct Buffer上,再把这个Direct Buffer写出去。当然,如果我们多次调用write方法,把一个Heap Buffer写入Channel,底层实现可以重复使用临时的Direct Buffer,这样不至于因为频繁地创建和销毁Direct Buffer影响性能。
结论:Direct Buffer创建和销毁的代价很高,所以要用在尽可能重用的地方。 比如周期长传输文件大采用direct buffer,不然一般情况下就直接用heap buffer 就好。
直接缓冲区不支持数组访问数据,但是我们可以间接的访问数据数组,如下面代码:
ByteBuf directBuf = Unpooled.directBuffer(16);
if(!directBuf.hasArray()){
int len = directBuf.readableBytes();
byte[] arr = new byte[len];
directBuf.getBytes(0, arr);
}
Composite Buffer(复合缓冲区)
复合缓冲区,我们可以创建多个不同的 ByteBuf,然后提供一个这些 ByteBuf组合的视图。
CompositeByteBuf compBuf =Unpooled.compositeBuffer();
ByteBuf heapBuf = Unpooled.buffer(8);
ByteBuf directBuf = Unpooled.directBuffer(16);
//添加ByteBuf到 CompositeByteBuf
compBuf.addComponents(heapBuf,directBuf);
//删除第一个ByteBuf
compBuf.removeComponent(0);
Iterator<ByteBuf> iter = compBuf.iterator();
while(iter.hasNext()){
System.out.println(iter.next().toString());
}
//使用数组访问数据 ,compBuf的hasArray只有当componentCount==1时才有意义,大于1时返回false
if(!compBuf.hasArray()){
int len = compBuf.readableBytes();
byte[] arr = new byte[len];
compBuf.getBytes(0, arr);
}
### CompositeByteBuf
写入的数据当作一个component。
readBytes(arr)超出可读范围抛异常。
getBytes(0,arr) 时超出cap时异常。
readBytes(arr) 时读取的是自身写入的数据,component的读不到,即使add Component在中间添加,对读无影响,超出可读范围抛异常。
除非当compBuf.addComponents(true,heapBuf,directBuf);时改变了写索引,这时可用readBytes(arr)读取,但从component中读到的字节不正确,自身写的数据可正常读。
经测试,最好不要写入数据。
getBytes(0,arr) 时先读取自身写入的,再读取component的,可以跨component读取。当compBuf.addComponents(true,heapBuf,directBuf)时,或者在addComponents前写入时才增加cap,但由于此时自身写的当作了component,此component的cap大于实际已写字节。
派生缓冲区
public void byteBufOp() {
Charset utf8 = Charset.forName("UTF-8");
// 创建一个用于保存给定字符串的字节的ByteBuf
ByteBuf buf = Unpooled.copiedBuffer("Netty in Action rocks!", utf8);
ByteBuf duplicate = buf.duplicate();
ByteBuf sliced = buf.slice(0, 15);
ByteBuf readSlice=buf.readSlice(10);
// 将打印“Netty in Action”
System.out.println(sliced.toString(utf8));
// 更新索引0 处的字节
buf.setByte(0, (byte) 'J');
// 将会成功,因为数据是共享的,对其中一个所做的更改对另外一个也是可见的
assert buf.getByte(0) == sliced.getByte(0);
}
真实拷贝
public void byteBufCopy() {
Charset utf8 = Charset.forName("UTF-8");
// 创建一个用于保存给定字符串的字节的ByteBuf
ByteBuf buf = Unpooled.copiedBuffer("Netty in Action rocks!", utf8);
// 创建该ByteBuf 从索引0 开始到索引15结束的分段的副本
ByteBuf copy = buf.copy(0, 15);
// 将打印“Netty in Action”
System.out.println(copy.toString(utf8));
// 更新索引0 处的字节
buf.setByte(0, (byte) 'J');
// 将会成功,因为数据不是共享的
assert buf.getByte(0) != copy.getByte(0);
}
索引访问
注意通过索引访问时不会推进读索引和写索引,我们可以通过 ByteBuf 的readerIndex()或writerIndex()来分别推进读索引或写索引。
/create a ByteBuf of capacity is 16
ByteBuf buf = Unpooled.buffer(16);
//write data to buf
for(int i=0;i<16;i++){
buf.writeByte(i+1);
}
//read data from buf
for(int i=0;i<buf.capacity();i++){
System.out.println(buf.getByte(i));
}
通过 CompositeByteBuf 实现零拷贝
假设我们有一份协议数据, 它由头部和消息体组成, 而头部和消息体是分别存放在两个 ByteBuf 中的, 即:
ByteBuf header = ...
ByteBuf body = ...
我们在代码处理中, 通常希望将 header 和 body 合并为一个 ByteBuf, 方便处理, 那么通常的做法是:
ByteBuf allBuf = Unpooled.buffer(header.readableBytes() + body.readableBytes());
allBuf.writeBytes(header);
allBuf.writeBytes(body);
可以看到, 我们将 header 和 body 都拷贝到了新的 allBuf 中了, 这无形中增加了两次额外的数据拷贝操作了.
那么有没有更加高效优雅的方式实现相同的目的呢? 我们来看一下 CompositeByteBuf 是如何实现这样的需求的吧.
CompositeByteBuf compositeByteBuf = Unpooled.compositeBuffer();
compositeByteBuf.addComponents(true, header, body);
上面 CompositeByteBuf 代码一个地方值得注意的是, 我们调用 addComponents(boolean increaseWriterIndex, ByteBuf... buffers) 来添加两个 ByteBuf, 其中第一个参数是 true, 表示当添加新的 ByteBuf 时, 自动递增 CompositeByteBuf 的 writeIndex.
如果我们调用的是
compositeByteBuf.addComponents(header, body);
那么其实 compositeByteBuf 的 writeIndex 仍然是0, 因此此时我们就不可能从 compositeByteBuf 中读取到数据.
通过 wrap 操作实现零拷贝
例如我们有一个 byte 数组, 我们希望将它转换为一个 ByteBuf 对象, 以便于后续的操作, 那么传统的做法是将此 byte 数组拷贝到 ByteBuf 中, 即:
byte[] bytes = ...
ByteBuf byteBuf = Unpooled.buffer();
byteBuf.writeBytes(bytes);
显然这样的方式也是有一个额外的拷贝操作的, 我们可以使用 Unpooled 的相关方法, 包装这个 byte 数组, 生成一个新的 ByteBuf 实例, 而不需要进行拷贝操作. 上面的代码可以改为:
byte[] bytes = ...
ByteBuf byteBuf = Unpooled.wrappedBuffer(bytes);
可以看到, 我们通过 Unpooled.wrappedBuffer 方法来将 bytes 包装成为一个 UnpooledHeapByteBuf 对象, 而在包装的过程中, 是不会有拷贝操作的. 即最后我们生成的生成的 ByteBuf 对象是和 bytes 数组共用了同一个存储空间, 对 bytes 的修改也会反映到 ByteBuf 对象中.
通过 slice 操作实现零拷贝
slice 操作和 wrap 操作刚好相反, Unpooled.wrappedBuffer 可以将多个 ByteBuf 合并为一个, 而 slice 操作可以将一个 ByteBuf 切片 为多个共享一个存储区域的 ByteBuf 对象.ByteBuf 提供了两个 slice 操作方法:
public ByteBuf slice();
public ByteBuf slice(int index, int length);
不带参数的 slice 方法等同于 buf.slice(buf.readerIndex(), buf.readableBytes()) 调用, 即返回 buf 中可读部分的切片. 而 slice(int index, int length) 方法相对就比较灵活了, 我们可以设置不同的参数来获取到 buf 的不同区域的切片.
【10】Netty中TCP拆包、粘包问题
在基于流的传输里比如TCP/IP,接收到的数据会先被存储到一个socket接收缓冲里。不幸的是,基于流的传输并不是一个数据包队列,而是一个字节队列。即使你发送了2个独立的数据包,操作系统也不会作为2个消息处理而仅仅是作为一连串的字节。
TCP是一个基于流的协议,TCP作为传输层协议并不知道应用层协议的具体含义,它会根据TCP缓冲区的实际情况进行数据包的划分,所以在应用层上认为是一个完整的包,可能会被TCP拆分成多个包进行发送,也有可能把多个小的包封装成一个大的数据包发送,这就是所谓的TCP粘包和半包问题。
有如下三种方案:
1.消息定长,例如每个报文的大小固定为多少字节,如果不够,就空格补充
2.在包尾部增加特殊的字符进行分隔,例如$
3.将消息分为消息头和消息体,在消息头中包含表示消息总长度的字段,然后就行业务逻辑的处理
采用定长
new FixedLengthFrameDecoder(10)
采用分隔符
//以$为分隔符
ByteBuf buf = Unpooled.copiedBuffer("$".getBytes());
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, buf))
// f.channel().writeAndFlush(Unpooled.copiedBuffer("Hello Server$".getBytes()));
Netty自带拆包类
FixedLengthFrameDecoder 按照特定长度组包
DelimiterBasedFrameDecoder 按照指定分隔符组包, 例如本文中的$$$
LineBasedFrameDecoder 按照换行符进行组包, \r \n等等
LengthFieldBasedFrameDecoder:跟据包头部定义的长度来区分包
【11】Pipeline
详情参考
Pipeline初始化的过程中增加了head和tail这两个Handler。tail的inbound属性为true, outbound属性为false;head的inbound属性为false,outbound属性为true。
head节点的作用主要是将各种事件传播到后续节点,并且其携带unSafe属性用于数据的读写;tail节点的作用是作为处理链的最后一个节点,进行一些收尾操作。
调用addLast()往Pipeline中添加ChannelHandler:
1.判断是否重复添加(是否有添加 注解并且是否已经添加过),添加了 注解后,就表示这个ChannelHandler可以同时被多个ChannelPipeline添加,而如果没有这个注解,则只能被一个ChannelPipeline添加.
2.创建节点并添加至链表,将当前创建的节点添加为ChannelPipeline这个双向链表的最后一个tail节点的前面一个节点。
3.回调添加完成事件
inBound事件的传播( ChannelInboundHandler接口定义的方法):
channelRegistered和channelUnregistered方法是Channel注册到NioEventLoop线程中对应的selector选择器上或是从选择器上取消注册的时候触发的回调函数;
channelActive和channelInactive方法是通道激活和失效触发的回调函数;
channelRead方法是Channel读到数据或者建立连接后的回调函数,channelReadComplete是Channel读取完毕后的回调函数;
userEventTriggered方法表示用户事件被触发后的回调函数,channelWritabilityChanged表示Channel读写能力发生变化后的回调函数;
对比ChannelHandler和ChannelInboundHandler这两个接口,可知ChannelInboundHandler添加了一些Channel相关的回调函数。下面就来从channelRead这个事件来作为示例,看下事件是如何在ChannelPipeline中各个节点之间进行传播的。
channelRead事件的传播:
执行ctx.channel().pipeline().fireChannelRead("hello world");语句时事件和消息是从pipeline的头部开始传递的,
而执行ctx.fireChannelRead()语句时事件和消息是从当前节点开始向后传递的。
如果read事件最后传递到了tail节点,则会调用ReferenceCountUtil.release(msg)这个方法将保存消息的内存进行释放;如果没有调用fireChannelRead()方法将事件向后传递,也没有处理消息,那么这个内存就永远得不到释放了,这就会导致内存泄露。
在编写inbound类型的ChannalHandler实现类的时候,只要继承这个SimpleChannelInboundHandler类就行,复写channelRead0()方法,并且在这个方法中并不需要显示调用ReferenceCountUtil.release(msg);方法来释放内存,因为在SimpleChannelInboundHandler类的channelRead()方法内部最终都会自己执行释放内存的操作。
write事件传播:
ctx.channel().write("hello,world"),执行的pipeline对象的write方法,执行的pipeline链表的tail节点的write方法;
ctx.write(msg,promise) 向前一个节点传递。
【12】为什么Netty会发生内存泄漏问题
Netty之有效规避内存泄漏
如果应用程序在使用ByteBuf后,没有调用方法release()(这个方法将其放回对象池中),又没有任何进一步的引用,则会发生泄漏。
在这种情况下,缓冲区最终将被GC(垃圾回收器)清除,但Netty的对象池不会知道这种情况。
然后,对象池将逐渐相信程序正在使用越来越多的永不返回池中的ByteBuf。
这可能会产生内存泄漏,因为ByteBuf被垃圾回收器回收,对象池回收不到它。导致对象池创建越来越多的新的引用计数对象。
ByteBuf 是Netty中主要用来数据byte[]的封装类,主要分为Heap ByteBuf 和 Direct ByteBuf。为了减少内存的分配回收以及产生的内存碎片,Netty提供了PooledByteBufAllocator 用来分配可回收的ByteBuf,
可以把PooledByteBufAllocator看做一个池子,需要的时候从里面获取ByteBuf,用完了放回去,以此提高性能。当然与之对应的还有 UnpooledByteBufAllocator,顾名思义Unpooled就是不会放到池子里,
所以根据该分配器分配的ByteBuf,不需要放回池子有JVM自己GC回收。
在netty中,根据ChannelHandlerContext 和 Channel获取的Allocator默认都是Pooled,所以需要再合适的时机对其进行释放,避免造成内存泄漏。
Netty默认会在ChannelPipline的最后添加一个tail handler帮你完成ByteBuf的release。其释放的是channelRead传入的ByteBuf,如果在handlers传递过程中,传递了新值,老值需要你自己手动释放。
另外如果中途没有使用fireChannelRead传递下去也要自己释放。在传递过程中自己通过Channel或ChannelHandlerContext创建的但是没有传递下去的ByteBuf也要手动释放。
为了帮助你诊断潜在的泄漏问题,netty提供了ResourceLeakDetector,该类会采样应用程序中%1的buffer分配,并进行跟踪。不用担心这个开销很小。
Netty目前定义了四中检测级别:DISABLE, SIMPLE(默认),ADVANCED, PARANOID。
禁用(DISABLED) - 完全禁止泄露检测,省点消耗。
简单(SIMPLE) - 默认等级,告诉我们取样的1%的ByteBuf是否发生了泄露,但总共一次只打印一次,看不到就没有了。
高级(ADVANCED) - 告诉我们取样的1%的ByteBuf发生泄露的地方。每种类型的泄漏(创建的地方与访问路径一致)只打印一次。对性能有影响。
偏执(PARANOID) - 跟高级选项类似,但此选项检测所有ByteBuf,而不仅仅是取样的那1%。对性能有绝大的影响。
总结:
所谓内存泄漏,主要是针对池化的ByteBuf。ByteBuf对象被JVM GC掉之前,没有调用release()把底下的DirectByteBuffer或byte[]归还到池里,会导致池越来越大。
而非池化的ByteBuf,即使像DirectByteBuf那样可能会用到System.gc(),但终归会被release掉的,不会出大事。
【13】Netty SSL性能调优
参考链接
###Java 对SSL的支持
JDK7的client端只支持TLS1.0,服务端则支持TLS1.2。
JDK8完全支持TLS1.2。
JDK7不支持GCM算法。
JDK8支持GCM算法,但性能极差极差极差,按Netty的说法:
Java 8u60以前多版本,只能处理1 MB/s。
Java 8u60 开始,10倍的性能提升,10-20 MB/s。
但比起 OpenSSL的 ~200 MB/s,还差着一个量级。
###Netty 对SSL的支持
Netty既支持JDK SSL,也支持Google的boringssl, 这是OpenSSL 的一个fork,更少的代码,更多的功能。
依赖netty-tcnative-boringssl-static-linux-x86_64.jar即可,它里面已包含了相关的so文件,再也不用管Linux里装没装OpenSSL,OpenSSL啥版本了。
###Netty4,如果使用openssl,需要添加下面两个jar,如果只用jdk 提供的ssl,可以不导入
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-tcnative</artifactId>
<version>2.0.25.Final</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-tcnative-boringssl-static</artifactId>
<version>2.0.25.Final</version>
<scope>runtime</scope>
</dependency>
SslContextBuilder.forServer(kmf).trustManager(tf).build(); //不设置.sslProvider(SslProvider.OPENSSL)时,当SslProvider.OPENSSL可用时,优先使用SslProvider.OPENSSL。
###SslProvider.OPENSSL支持的ciphers:
0 = "TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384"
1 = "TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256"
2 = "TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256"
3 = "TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA"
4 = "TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA"
5 = "TLS_RSA_WITH_AES_128_GCM_SHA256"
6 = "TLS_RSA_WITH_AES_128_CBC_SHA"
7 = "TLS_RSA_WITH_AES_256_CBC_SHA"
###SslProvider.JDK支持的ciphers:
0 = "TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256"
1 = "TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256"
2 = "TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA"
3 = "TLS_RSA_WITH_AES_128_GCM_SHA256"
4 = "TLS_RSA_WITH_AES_128_CBC_SHA"
JDK SSL 使用
public class SSLContextFactory {
private static SSLContextFactory factory=null;
private static SpeechProperties.Ssl ssl=null;
private static SSLContext sslContext=null;
private SSLContextFactory(){
}
public static SSLContextFactory getInstance(){
if(factory==null){
synchronized(SSLContextFactory.class){
if(factory==null){
factory=new SSLContextFactory();
ssl= SpringUtils.getBean(SpeechProperties.class).getNetty().getSsl();
}
}
}
return factory;
}
private static SSLContext getSSLContext() {
if(sslContext==null){
synchronized(SSLContextFactory.class){
if(sslContext==null){
try {
sslContext = SSLContext.getInstance(ssl.getProtocol());
KeyStore keyStore = KeyStore.getInstance(ssl.getKeyStoreType());
InputStream inputStream=ssl.getKeyStoreLocation().getInputStream();
keyStore.load(inputStream, ssl.getKeyStorePassword().toCharArray());
inputStream.close();
KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
kmf.init(keyStore, ssl.getKeyPassword().toCharArray());
KeyManager[] keyManagers = kmf.getKeyManagers();
TrustManager[] trustManagers=null;
if(ssl.isNeedClientAuth()){
KeyStore tKeyStore = KeyStore.getInstance(ssl.getTrustStoreType());
InputStream inputStream1=ssl.getTrustStoreLocation().getInputStream();
tKeyStore.load(inputStream1, ssl.getTrustStorePassword().toCharArray());
TrustManagerFactory tf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
tf.init(tKeyStore);
trustManagers=tf.getTrustManagers();
}
sslContext.init(keyManagers, trustManagers,new SecureRandom());
return sslContext;
} catch (NoSuchAlgorithmException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} catch (CertificateException e) {
e.printStackTrace();
} catch (UnrecoverableKeyException e) {
e.printStackTrace();
} catch (KeyStoreException e) {
e.printStackTrace();
} catch (KeyManagementException e) {
e.printStackTrace();
}
}
}
}
return sslContext;
}
private SSLEngine getSSLEngine() {
SSLEngine sslEngine =getSSLContext().createSSLEngine();
sslEngine.setUseClientMode(false);
sslEngine.setNeedClientAuth(ssl.isNeedClientAuth());
return sslEngine;
}
public SslHandler getSslHandler() {
if(ssl.isEnableSSL()){
SSLEngine sslEngine=getSSLEngine();
if(sslEngine!=null){
return new SslHandler(sslEngine);
}
}
return null;
}
}
SslHandler sslHandler=SSLContextFactory.getInstance().getSslHandler();
if (sslHandler != null) {
e.pipeline().addFirst("ssl", sslHandler);
}
OPEN SSL 使用
public class SSLContextFactory {
private static SSLContextFactory factory=null;
private static SpeechProperties.Ssl ssl=null;
private static SslContext sslContext=null;
private SSLContextFactory(){
}
public static SSLContextFactory getInstance(){
if(factory==null){
synchronized(SSLContextFactory.class){
if(factory==null){
factory=new SSLContextFactory();
ssl= SpringUtils.getBean(SpeechProperties.class).getNetty().getSsl();
}
}
}
return factory;
}
private static SslContext getSSLContext() {
if(sslContext==null){
synchronized(SSLContextFactory.class){
if(sslContext==null){
try {
KeyStore keyStore = KeyStore.getInstance(ssl.getKeyStoreType());
InputStream inputStream=ssl.getKeyStoreLocation().getInputStream();
keyStore.load(inputStream, ssl.getKeyStorePassword().toCharArray());
inputStream.close();
KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
kmf.init(keyStore, ssl.getKeyPassword().toCharArray());
TrustManagerFactory tf=null;
if(ssl.isNeedClientAuth()){
KeyStore tKeyStore = KeyStore.getInstance(ssl.getTrustStoreType());
InputStream inputStream1=ssl.getTrustStoreLocation().getInputStream();
tKeyStore.load(inputStream1, ssl.getTrustStorePassword().toCharArray());
tf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
tf.init(tKeyStore);
}
sslContext = SslContextBuilder.forServer(kmf).trustManager(tf).build();
return sslContext;
} catch (NoSuchAlgorithmException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} catch (CertificateException e) {
e.printStackTrace();
} catch (UnrecoverableKeyException e) {
e.printStackTrace();
} catch (KeyStoreException e) {
e.printStackTrace();
}
}
}
}
return sslContext;
}
private SSLEngine getSSLEngine(ByteBufAllocator alloc) {
SSLEngine sslEngine =getSSLContext().newEngine(alloc);
sslEngine.setUseClientMode(false);
sslEngine.setNeedClientAuth(ssl.isNeedClientAuth());
return sslEngine;
}
public SslHandler getSslHandler(ByteBufAllocator alloc) {
if(ssl.isEnableSSL()){
SSLEngine sslEngine=getSSLEngine(alloc);
if(sslEngine!=null){
return new SslHandler(sslEngine);
}
}
return null;
}
}
SslHandler sslHandler=SSLContextFactory.getInstance().getSslHandler(e.alloc());
if (sslHandler != null) {
e.pipeline().addFirst("ssl", sslHandler);
}
【14】netty编解码器
HttpObjectAggregator 把多个消息转换为一个单一的FullHttpRequest或是FullHttpResponse。
【15】自定义编解码器
ByteToMessageDecoder
abstract ByteToMessageDecoder -> ChannelInboundHandlerAdapter -> abstract ChannelHandlerAdapter -> interface ChannelHandler
-> interface ChannelInboundHandler -> interface ChannelHandler
ByteToMessageDecoder的channelRead方法读到消息后判断若为ByteBuf类型则调用它的callDecode方法,进而[while (in.isReadable())]调用decodeRemovalReentryProtection方法,最终调用它的实现方法decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)。
自定义的解码器在decode方法中将解码后的消息放入list中,由callDecode方法识别到后调用fireChannelRead向下传递(遍历list集合,每次传递一个)。
ByteToMessageDecoder 中的 ByteBuf cumulation字段 缓存未解码的数据,等下个数据包过来时拼接后解码。
MessageToByteEncoder
abstract MessageToByteEncoder<I> -> ChannelOutboundHandlerAdapter -> abstract ChannelHandlerAdapter -> interface ChannelHandler
-> interface ChannelOutboundHandler -> interface ChannelHandler
ctx.writeAndFlush(33),向下传递消息至MessageToByteEncoder的write方法,此时判断类型匹配时(不匹配时直接向下传递)调用它的实现方法encode(ChannelHandlerContext ctx, I msg, ByteBuf out),自定义的实现方法将msg写入out,然后向下传递out.
abstract class MessageToMessageDecoder extends ChannelInboundHandlerAdapter
abstract MessageToMessageDecoder<I> -> ChannelInboundHandlerAdapter -> abstract ChannelHandlerAdapter -> interface ChannelHandler
-> interface ChannelInboundHandler -> interface ChannelHandler
与ByteToMessageDecoder不同,其实现方法是 decode(ChannelHandlerContext ctx, I msg, List<Object> out)
MessageToMessageEncoder
abstract MessageToMessageEncoder<I> -> ChannelOutboundHandlerAdapter -> abstract ChannelHandlerAdapter -> interface ChannelHandler
-> interface ChannelOutboundHandler -> interface ChannelHandler
与MessageToByteEncoder不同,其实现的方法是 encode(ChannelHandlerContext ctx, I msg, List<Object> out)
以上是关于netty channelhandleradapter 是否去了channelread方法的主要内容,如果未能解决你的问题,请参考以下文章