netty channelhandleradapter 是否去了channelread方法

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了netty channelhandleradapter 是否去了channelread方法相关的知识,希望对你有一定的参考价值。

参考技术A if (如果是登录报文)
//处理登录信息
else // 其它报文
// 这句话就是调用下一个channelRead
ctx.fireChannelRead(msg);

本回答被提问者采纳

netty笔记

1】Reactor模式2】NIO3】Netty组件4】Netty 线程模型5】异步处理6】ChannelOption7】常见Handler8】使用SSL/TLS创建安全的Netty程序9】Netty中的ByteBuf10】Netty中TCP拆包、粘包问题11】Pipeline12】为什么Netty会发生内存泄漏问题13】Netty SSL性能调优14】netty编解码器15】自定义编解码器
===========================================================================================================================================================1】Reactor模式
在处理web请求时通常有两种体系结构,分别为:thread-based architecture(基于线程)、event-driven architecture(事件驱动)。基于线程的体系结构通常会使用多线程来处理客户端的请求,每当接收到一个请求,便开启一个独立的线程来处理。事件驱动体系结构会定义一系列的事件处理器来响应事件的发生,并且将服务端接受连接与对事件的处理分离。其中,事件是一种状态的改变。比如,tcp中socket的new incoming connection、ready for read、ready for write。reactor设计模式是event-driven architecture的一种实现方式,处理多个客户端并发的向服务端请求服务的场景。每种服务在服务端可能由多个方法组成。reactor会解耦并发请求的服务并分发给对应的事件处理器来处理。目前,许多流行的开源框架都用到了reactor模式,如:netty、node.js等,包括java的nio。reactor主要由以下几个角色构成:handle、Synchronous Event Demultiplexer、Initiation Dispatcher、Event Handler、Concrete Event Handler取决于 Reactor 的数量和 Hanndler 线程数量的不同,Reactor 模型有 3 个变种: 单 Reactor 单线程。 单 Reactor 多线程。 主从 Reactor 多线程。
2】NIOJava NIO 的核心组成部分:Channel、Buffer、Selector。其它组件:Pipe、FileLock 等。
Channel Channel用于在字节缓冲区和位于通道另一侧的实体(通常是一个文件或套接字)之间有效地传输数据。 通道类似流,但又有些不同:既可以从通道中读取数据,又可以写数据到通道。但流的读写通常是单向的。通道可以异步地读写。通道中的数据总是要先读到一个 Buffer,或者总是要从一个 Buffer 中写入。 java.nio.channels包中的Channel接口的已知实现类: FileChannel:从文件中读写数据。FileChannel对象是线程安全的、总是阻塞式的。 DatagramChannel:能通过UDP读写网络中的数据。 SocketChannel:能通过TCP读写网络中的数据。 ServerSocketChannel:可以监听新进来的TCP连接,像Web服务器那样。对每一个新进来的连接都会创建一个SocketChannel。 Buffer 缓冲区本质上是一块可以写入数据,然后可以从中读取数据的内存。这块内存被包装成NIO Buffer对象,并提供了一组方法,用来方便的访问该块内存。 使用Buffer读写数据一般遵循以下四个步骤:写入数据到Buffer、调用flip()方法、从Buffer中读取数据、调用clear()方法或者compact()方法。 当向buffer写入数据时,buffer会记录下写了多少数据。一旦要读取数据,需要通过flip()方法将Buffer从写模式切换到读模式。在读模式下,可以读取之前写入到buffer的所有数据。一旦读完了所有的数据,就需要清空缓冲区,让它可以再次被写入。有两种方式能清空缓冲区:调用clear()或compact()方法。clear()方法会清空整个缓冲区。compact()方法只会清除已经读过的数据。任何未读的数据都被移到缓冲区的起始处,新写入的数据将放到缓冲区未读数据的后面。 +Buffer的三个属性: capacity,作为一个内存块,Buffer有一个固定的大小值,也叫“capacity”.你只能往里写capacity个byte、long,char等类型。一旦Buffer满了,需要将其清空(通过读数据或者清除数据)才能继续写数据往里写数据。 position,当你写数据到Buffer中时,position表示当前的位置。初始的position值为0.当一个byte、long等数据写到Buffer后, position会向前移动到下一个可插入数据的Buffer单元。position最大可为capacity – 1;当读取数据时,也是从某个特定位置读。当将Buffer从写模式切换到读模式,position会被重置为0. 当从Buffer的position处读取数据时,position向前移动到下一个可读的位置。 limit,在写模式下,Buffer的limit表示你最多能往Buffer里写多少数据。 写模式下,limit等于Buffer的capacity;当切换Buffer到读模式时, limit表示你最多能读到多少数据。因此,当切换Buffer到读模式时,limit会被设置成写模式下的position值。换句话说,你能读到之前写入的所有数据(limit被设置成已写数据的数量,这个值在写模式下就是position)。 Java NIO 有以下Buffer类型 ByteBuffer MappedByteBuffer CharBuffer DoubleBuffer FloatBuffer IntBuffer LongBuffer ShortBuffer 使用示例:
ByteBuffer buf = ByteBuffer.allocate(48); //分配48字节capacity的ByteBuffer int bytesRead = inChannel.read(buf); //read into buffer.从Channel写到Buffer buf.put(127); //通过Buffer的put()方法写到Buffer里 buf.flip(); //flip方法将Buffer从写模式切换到读模式。调用flip()方法会将position设回0,并将limit设置成之前position的值 int bytesWritten = inChannel.write(buf); //从Buffer读取数据到Channel byte aByte = buf.get(); //使用get()方法从Buffer中读取数据 Buffer.rewind() //将position设回0,所以你可以重读Buffer中的所有数据。limit保持不变,仍然表示能从Buffer中读取多少个元素(byte、char等)。 Buffer.mark() //可以标记Buffer中的一个特定position。之后可以通过调用Buffer.reset()方法恢复到这个position.
Selector
Selector的创建,通过调用Selector.open()方法创建一个Selector,如下: Selector selector = Selector.open(); 向Selector注册通道,为了将Channel和Selector配合使用,必须将channel注册到selector上。通过SelectableChannel.register()方法来实现,如下: channel.configureBlocking(false); selectionKey key = channel.register(selector,Selectionkey.OP_READ); 与Selector一起使用时,Channel必须处于非阻塞模式下。这意味着不能将FileChannel与Selector一起使用,因为FileChannel不能切换到非阻塞模式。而套接字通道都可以。注意register()方法的第二个参数。这是一个“interest集合”,意思是在通过Selector监听Channel时对什么事件感兴趣。可以监听四种不同类型的事件:Connect/Accept/Read/Write 通过Selector选择通道,一旦向Selector注册了一或多个通道,就可以调用几个重载的select()方法。这些方法返回你所感兴趣的事件(如连接、接受、读或写)已经准备就绪的那些通道。 select()方法返回的int值表示有多少通道已经就绪。亦即,自上次调用select()方法后有多少通道变成就绪状态。如果调用select()方法,因为有一个通道变成就绪状态,返回了1,若再次调用select()方法,如果另一个通道就绪了,它会再次返回1。一旦调用了select()方法,并且返回值表明有一个或更多个通道就绪了,然后可以通过调用selector的selectedKeys()方法,访问“已选择键集(selected key set)”中的就绪通道。 使用示例:
Selector selector = Selector.open(); channel.configureBlocking(false); SelectionKey key = channel.register(selector, SelectionKey.OP_READ); while(true) { int readyChannels = selector.select(); if(readyChannels == 0) continue; Set selectedKeys = selector.selectedKeys(); Iterator keyIterator = selectedKeys.iterator(); while(keyIterator.hasNext()) { SelectionKey key = keyIterator.next(); if(key.isAcceptable()) { // a connection was accepted by a ServerSocketChannel. } else if (key.isConnectable()) { // a connection was established with a remote server. } else if (key.isReadable()) { // a channel is ready for reading } else if (key.isWritable()) { // a channel is ready for writing } keyIterator.remove(); } }
SelectionKey,这个对象包含了一些你感兴趣的属性:interest集合/ready集合/Channel/Selector/附加的对象(可选)
int interestSet = selectionKey.interestOps(); boolean isInterestedInAccept = (interestSet & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT; int readySet = selectionKey.readyOps(); selectionKey.isAcceptable(); Channel channel = selectionKey.channel(); Selector selector = selectionKey.selector(); selectionKey.attach(theObject); Object attachedObj = selectionKey.attachment(); SelectionKey key = channel.register(selector, SelectionKey.OP_READ, theObject);
3】Netty组件
+ 核心组件:Channel、回调、Future、事件和ChannelHandler。 Channel:基本的I/O 操作(bind()、connect()、read()和write())依赖于底层网络传输所提供的原语。 EmbeddedChannel、LocalServerChannel、NioDatagramChannel、NioSctpChannel、NioSocketChannel。 EventLoop:定义了Netty 的核心抽象,用于处理连接的生命周期中所发生的事件。 一个EventLoopGroup 包含一个或者多个EventLoop; 一个EventLoop 在它的生命周期内只和一个Thread 绑定; 所有由EventLoop 处理的I/O 事件都将在它专有的Thread 上被处理; 一个Channel 在它的生命周期内只注册于一个EventLoop; 一个EventLoop 可能会被分配给一个或多个Channel。
+ 常用类: ChannelHandler -> ChannelInboundHandler -> ChannelInboundHandlerAdapter : @Sharable 一个ChannelHandler可以属于多个ChannelPipeline,它也可以绑定多个ChannelHandlerContext实例,如果一个ChannelHandler想要有这样的功能,就必须以@Sharable注解注释这个ChannelHandler,否则,尝试将其加入到不止一个ChannelPipeline中去的时候,会报出异常. channelRead()—对于每个传入的消息都要调用; channelReadComplete()—通知ChannelInboundHandler 最后一次对channelRead()的调用是当前批量读取中的最后一条消息; exceptionCaught()—在读取操作期间,有异常抛出时会调用 ChannelHandler -> ChannelInboundHandler -> ChannelInboundHandlerAdapter -> SimpleChannelInboundHandler : channelActive()—在到服务器的连接已经建立之后将被调用; channelRead0()—当从服务器接收到一条消息时被调用; exceptionCaught()—在处理过程中引发异常时被调用。 ChannelHandler -> ChannelInboundHandler -> ChannelInboundHandlerAdapter -> ChannelInitializer<C extends Channel>
+ 示例: ##服务端 final EchoServerHandler serverHandler = new EchoServerHandler(); EventLoopGroup group = new NioEventLoopGroup(); try{ ServerBootstrap b = new ServerBootstrap(); b.group(group).channel(NioServerSocketChannel.class).localAddress(new InetSocketAddress(port)).childHandler(new ChannelInitializer<SocketChannel>(){ @Override public void initChannel(SocketChannel ch)throws Exception { ch.pipeline().addLast(serverHandler); } }); ChannelFuture f = b.bind().sync(); f.channel().closeFuture().sync(); } finally { group.shutdownGracefully().sync(); } ##客户端 EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group).channel(NioSocketChannel.class).remoteAddress(new InetSocketAddress(host, port)).handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch)throws Exception { ch.pipeline().addLast(new EchoClientHandler()); } }); ChannelFuture f = b.connect().sync(); f.channel().closeFuture().sync(); } finally { group.shutdownGracefully().sync(); }
4】Netty 线程模型
Netty 主要基于主从 Reactors 多线程模型,做了一定的修改,其中主从 Reactor 多线程模型有多个 Reactor:MainReactor 负责客户端的连接请求,并将请求转交给 SubReactor。SubReactor 负责相应通道的 IO 读写请求。非 IO 请求(具体逻辑处理)的任务则会直接写入队列,等待 worker threads 进行处理。
Reactor 多线程模型的图虽然 Netty 的线程模型基于主从 Reactor 多线程,借用了 MainReactor 和 SubReactor 的结构。但是实际实现上 SubReactor 和 Worker 线程在同一个线程池中:
EventLoopGroup bossGroup = newNioEventLoopGroup();EventLoopGroup workerGroup = newNioEventLoopGroup();ServerBootstrap server= newServerBootstrap();server.group(bossGroup, workerGroup).channel(NioServerSocketChannel. class);
上面代码中的 bossGroup 和 workerGroup 是 Bootstrap 构造方法中传入的两个对象,这两个 group 均是线程池:
bossGroup 线程池则只是在 Bind 某个端口后,获得其中一个线程作为 MainReactor,专门处理端口的 Accept 事件,每个端口对应一个 Boss 线程。 workerGroup 线程池会被各个 SubReactor 和 Worker 线程充分利用。
5】异步处理
异步的概念和同步相对。当一个异步过程调用发出后,调用者不能立刻得到结果。实际处理这个调用的部件在完成后,通过状态、通知和回调来通知调用者。Netty 中的 I/O 操作是异步的,包括 Bind、Write、Connect 等操作会简单的返回一个 ChannelFuture。调用者并不能立刻获得结果,而是通过 Future-Listener 机制,用户可以方便的主动获取或者通过通知机制获得 IO 操作结果。当 Future 对象刚刚创建时,处于非完成状态,调用者可以通过返回的 ChannelFuture 来获取操作执行的状态,注册监听函数来执行完成后的操作。
常见有如下操作: 通过 isDone 方法来判断当前操作是否完成。 通过 isSuccess 方法来判断已完成的当前操作是否成功。 通过 isCancelled 方法来判断已完成的当前操作是否被取消。 通过 addListener 方法来注册监听器,当操作已完成(isDone 方法返回完成),将会通知指定的监听器;如果 Future 对象已完成,则理解通知指定的监听器。
例如下面的代码中绑定端口是异步操作,当绑定操作处理完,将会调用相应的监听器处理逻辑。
serverBootstrap.bind(port).addListener(future -> { if(future.isSuccess()) { System.out. println( newDate() + ": 端口["+ port + "]绑定成功!"); } else{ System.err. println( "端口["+ port + "]绑定失败!"); }});
6】ChannelOption
ChannelOption.SO_BACKLOG, 1024 BACKLOG用于构造服务端套接字ServerSocket对象,标识当服务器请求处理线程全满时,用于临时存放已完成三次握手的请求的队列的最大长度。如果未设置或所设置的值小于1,Java将使用默认值50 ChannelOption.SO_KEEPALIVE, true 是否启用心跳保活机制。在双方TCP套接字建立连接后(即都进入ESTABLISHED状态)并且在两个小时左右上层没有任何数据传输的情况下,这套机制才会被激活。 ChannelOption.TCP_NODELAY, true 在TCP/IP协议中,无论发送多少数据,总是要在数据前面加上协议头,同时,对方接收到数据,也需要发送ACK表示确认。为了尽可能的利用网络带宽,TCP总是希望尽可能的发送足够大的数据。这里就涉及到一个名为Nagle的算法,该算法的目的就是为了尽可能发送大块数据,避免网络中充斥着许多小数据块。 TCP_NODELAY就是用于启用或关于Nagle算法。如果要求高实时性,有数据发送时就马上发送,就将该选项设置为true关闭Nagle算法;如果要减少发送次数减少网络交互,就设置为false等累积一定大小后再发送。默认为false ChannelOption.SO_REUSEADDR, true SO_REUSEADDR允许启动一个监听服务器并捆绑其众所周知端口,即使以前建立的将此端口用做他们的本地端口的连接仍存在。这通常是重启监听服务器时出现,若不设置此选项,则bind时将出错。 SO_REUSEADDR允许在同一端口上启动同一服务器的多个实例,只要每个实例捆绑一个不同的本地IP地址即可。对于TCP,我们根本不可能启动捆绑相同IP地址和相同端口号的多个服务器。 SO_REUSEADDR允许单个进程捆绑同一端口到多个套接口上,只要每个捆绑指定不同的本地IP地址即可。这一般不用于TCP服务器。 SO_REUSEADDR允许完全重复的捆绑:当一个IP地址和端口绑定到某个套接口上时,还允许此IP地址和端口捆绑到另一个套接口上。一般来说,这个特性仅在支持多播的系统上才有,而且只对UDP套接口而言(TCP不支持多播) ChannelOption.SO_RCVBUF AND ChannelOption.SO_SNDBUF 定义接收或者传输的系统缓冲区buf的大小, ChannelOption.ALLOCATOR Netty4使用对象池,重用缓冲区 bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); bootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
7】常见Handler
编码解码HTTP协议消息的Handler:
HttpServerCodec -> CombinedChannelDuplexHandler -> ChannelDuplexHandler ...
HttpRequestDecode 将ByteBuf解码成HttpRequest和HttpContent HttpResponseEncoder 将HttpResponse或HttpContent编码成ByteBuf HttpClientCodec
处理HTTP时可能接收HTTP消息片段,Netty需要缓冲直到接收完整个消息。要完成的处理HTTP消息,并且内存开销也不会很大,Netty为此提供了HttpObjectAggregator。通过HttpObjectAggregator,Netty可以聚合HTTP消息,使用FullHttpResponse和FullHttpRequest到ChannelPipeline中的下一个ChannelHandler,这就消除了断裂消息,保证了消息的完整。
HttpObjectAggregator
使用HTTP时建议压缩数据以减少传输流量,压缩数据会增加CPU负载,现在的硬件设施都很强大,大多数时候压缩数据时一个好主意。Netty支持“gzip”和“deflate”,为此提供了两个ChannelHandler实现分别用于压缩和解压。
HttpContentDecompressor
使用HTTPS
SslHandler
使用WebSocket
BinaryWebSocketFrame 包含二进制数据 TextWebSocketFrame 包含文本数据 ContinuationWebSocketFrame 包含二进制数据或文本数据,BinaryWebSocketFrame和TextWebSocketFrame的结合体 CloseWebSocketFrame WebSocketFrame代表一个关闭请求,包含关闭状态码和短语 PingWebSocketFrame WebSocketFrame要求PongWebSocketFrame发送数据 PongWebSocketFrame WebSocketFrame要求PingWebSocketFrame响应
处理空闲连接和超时
IdleStateHandler 当一个通道没有进行读写或运行了一段时间后出发IdleStateEvent ReadTimeoutHandler 在指定时间内没有接收到任何数据将抛出ReadTimeoutException WriteTimeoutHandler 在指定时间内有写入数据将抛出WriteTimeoutException
解码分隔符和基于长度的协议
DelimiterBasedFrameDecoder 解码器,接收ByteBuf由一个或多个分隔符拆分,如NUL或换行符 LineBasedFrameDecoder 解码器,接收ByteBuf以分割线结束,如"\n""\r\n" FixedLengthFrameDecoder LengthFieldBasedFrameDecoder
打印日志
LoggingHandler
8】使用SSL/TLS创建安全的Netty程序
//服务端KeyManagerFactory keyManagerFactory = null;KeyStore keyStore = KeyStore.getInstance("JKS");keyStore.load(new FileInputStream("D:\\security\\server\\sChat.jks"), "sNetty".toCharArray());keyManagerFactory = KeyManagerFactory.getInstance("SunX509");keyManagerFactory.init(keyStore,"sNetty".toCharArray());SslContext sslContext = SslContextBuilder.forServer(keyManagerFactory).build();b.childHandler(new SslChannelInitializer(sslContext));

public class SslChannelInitializer extends ChannelInitializer<Channel> { private final SslContext context;
public SslChannelInitializer(SslContext context) { this.context = context; }
@Override protected void initChannel(Channel ch) throws Exception { SSLEngine engine = context.newEngine(ch.alloc()); engine.setUseClientMode(false); ch.pipeline().addFirst("ssl", new SslHandler(engine)); ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)); pipeline.addLast("frameEncoder", new LengthFieldPrepender(4)); //最大16M pipeline.addLast("decoder", new StringDecoder(Charset.forName("UTF-8"))); pipeline.addLast("encoder", new StringEncoder(Charset.forName("UTF-8"))); pipeline.addLast("spiderServerBusiHandler", new SpiderServerBusiHandler()); }}

//客户端//SslContext sslCtx = SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).build(); //InsecureTrustManagerFactory.INSTANCE 不安全的信任管理器,信任所有证书//p.addLast(NettyConstant.SSL, sslCtx.newHandler(sc.alloc(), uri.getHost(), uri.getPort()));
需要注意一点,SslHandler必须要添加到ChannelPipeline的第一个位置,可能有一些例外,但是最好这样来做.
public class SslChannelInitializer extends ChannelInitializer<Channel> { private final SSLContext context; private final boolean client; private final boolean startTls; public SslChannelInitializer(SSLContext context, boolean client, boolean startTls) { this.context = context; this.client = client; this.startTls = startTls; } @Override protected void initChannel(Channel ch) throws Exception { SSLEngine engine = context.createSSLEngine(); engine.setUseClientMode(client); ch.pipeline().addFirst("ssl", new SslHandler(engine, startTls));//startTls 如果为true,第一个写入的消息将不会被加密(客户端应该设置为true) }}
9】Netty中的ByteBuf
ByteBuf 与JDK中的 ByteBuffer 的最大区别就是:
1)netty的ByteBuf采用了读/写索引分离,一个初始化的ByteBuf的readerIndex和writerIndex都处于0位置。2)当读索引和写索引处于同一位置时,如果我们继续读取,就会抛出异常IndexOutOfBoundsException。3)对于ByteBuf的任何读写操作都会分别单独的维护读索引和写索引。maxCapacity最大容量默认的限制就是Integer.MAX_VALUE。
ByteBuf 的使用模式 JDK中的Buffer的类型 有heapBuffer和directBuffer两种类型,但是在netty中除了heap和direct类型外,还有composite Buffer(复合缓冲区类型)。 创建ByteBuf的方法
1)通过ByteBufAllocator这个接口来创建ByteBuf,这个接口可以创建上面的三种Buffer,一般都是通过channel的alloc()接口获取。2)通过Unpooled类里面的静态方法,创建Buffer
CompositeByteBuf compBuf = Unpooled.compositeBuffer(); ByteBuf heapBuf = Unpooled.buffer(8); ByteBuf directBuf = Unpooled.directBuffer(16);
// 在堆上分配一个ByteBuf,并指定初始容量和最大容量public static ByteBuf buffer(int initialCapacity, int maxCapacity) { return ALLOC.heapBuffer(initialCapacity, maxCapacity);}// 在堆外分配一个ByteBuf,并指定初始容量和最大容量public static ByteBuf directBuffer(int initialCapacity, int maxCapacity) { return ALLOC.directBuffer(initialCapacity, maxCapacity);}// 使用包装的方式,将一个byte[]包装成一个ByteBuf后返回public static ByteBuf wrappedBuffer(byte[] array) { if (array.length == 0) { return EMPTY_BUFFER; } return new UnpooledHeapByteBuf(ALLOC, array, array.length);}// 返回一个组合ByteBuf,并指定组合的个数public static CompositeByteBuf compositeBuffer(int maxNumComponents){ return new CompositeByteBuf(ALLOC, false, maxNumComponents);}
使用示例
private static void simpleUse(){ // 1.创建一个非池化的ByteBuf,大小为10个字节 ByteBuf buf = Unpooled.buffer(10); System.out.println("原始ByteBuf为====================>"+buf.toString()); System.out.println("1.ByteBuf中的内容为===============>"+Arrays.toString(buf.array())+"\n");
// 2.写入一段内容 byte[] bytes = {1,2,3,4,5}; buf.writeBytes(bytes); System.out.println("写入的bytes为====================>"+Arrays.toString(bytes)); System.out.println("写入一段内容后ByteBuf为===========>"+buf.toString()); System.out.println("2.ByteBuf中的内容为===============>"+Arrays.toString(buf.array())+"\n");
// 3.读取一段内容 byte b1 = buf.readByte(); byte b2 = buf.readByte(); System.out.println("读取的bytes为====================>"+Arrays.toString(new byte[]{b1,b2})); System.out.println("读取一段内容后ByteBuf为===========>"+buf.toString()); System.out.println("3.ByteBuf中的内容为===============>"+Arrays.toString(buf.array())+"\n");
// 4.将读取的内容丢弃 buf.discardReadBytes(); System.out.println("将读取的内容丢弃后ByteBuf为========>"+buf.toString()); System.out.println("4.ByteBuf中的内容为===============>"+Arrays.toString(buf.array())+"\n");
// 5.清空读写指针 buf.clear(); System.out.println("将读写指针清空后ByteBuf为==========>"+buf.toString()); System.out.println("5.ByteBuf中的内容为===============>"+Arrays.toString(buf.array())+"\n");
// 6.再次写入一段内容,比第一段内容少 byte[] bytes2 = {1,2,3}; buf.writeBytes(bytes2); System.out.println("写入的bytes为====================>"+Arrays.toString(bytes2)); System.out.println("写入一段内容后ByteBuf为===========>"+buf.toString()); System.out.println("6.ByteBuf中的内容为===============>"+Arrays.toString(buf.array())+"\n");
// 7.将ByteBuf清零 buf.setZero(0,buf.capacity()); System.out.println("将内容清零后ByteBuf为==============>"+buf.toString()); System.out.println("7.ByteBuf中的内容为================>"+Arrays.toString(buf.array())+"\n");
// 8.再次写入一段超过容量的内容 byte[] bytes3 = {1,2,3,4,5,6,7,8,9,10,11}; buf.writeBytes(bytes3); System.out.println("写入的bytes为====================>"+Arrays.toString(bytes3)); System.out.println("写入一段内容后ByteBuf为===========>"+buf.toString()); System.out.println("8.ByteBuf中的内容为===============>"+Arrays.toString(buf.array())+"\n");}
执行结果:
原始ByteBuf为====================>UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 0, cap: 10)1.ByteBuf中的内容为===============>[0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
写入的bytes为====================>[1, 2, 3, 4, 5]写入一段内容后ByteBuf为===========>UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 5, cap: 10)2.ByteBuf中的内容为===============>[1, 2, 3, 4, 5, 0, 0, 0, 0, 0]
读取的bytes为====================>[1, 2]读取一段内容后ByteBuf为===========>UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 2, widx: 5, cap: 10)3.ByteBuf中的内容为===============>[1, 2, 3, 4, 5, 0, 0, 0, 0, 0]
将读取的内容丢弃后ByteBuf为========>UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 3, cap: 10)4.ByteBuf中的内容为===============>[3, 4, 5, 4, 5, 0, 0, 0, 0, 0]
将读写指针清空后ByteBuf为==========>UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 0, cap: 10)5.ByteBuf中的内容为===============>[3, 4, 5, 4, 5, 0, 0, 0, 0, 0]
写入的bytes为====================>[1, 2, 3]写入一段内容后ByteBuf为===========>UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 3, cap: 10)6.ByteBuf中的内容为===============>[1, 2, 3, 4, 5, 0, 0, 0, 0, 0]
将内容清零后ByteBuf为==============>UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 3, cap: 10)7.ByteBuf中的内容为================>[0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
写入的bytes为====================>[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]写入一段内容后ByteBuf为===========>UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 14, cap: 64)8.ByteBuf中的内容为===============>[0, 0, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
写操作 ByteBuf中定义了两类方法可以往ByteBuf中写入内容:writeXX() 和 setXX()。setXX是替换指定位置的值,而writeXX是想当前写指针写入数据后递增指针。 读操作 跟写操作一样,ByteBuf的读操作也有两种方法,分别是getXX()和readXX()。注意:getset之类方法不会改变读写索引的位置。
ByteBuf buf = (ByteBuf)msg;byte[] req = new byte[buf.readableBytes()];buf.readBytes(req);String body = new String(req,"UTF-8");
Direct Buffer( 直接缓冲区)
直接缓冲区,在堆之外直接分配内存。与堆buf区别: 劣势:创建和释放Direct Buffer的代价比Heap Buffer得要高; 优势:当我们把一个Direct Buffer写入Channel的时候,就好比是“内核缓冲区”的内容直接写入了Channel,这样显然快了,减少了数据拷贝(因为我们平时的read/write都是需要在I/O设备与应用程序空间之间的“内核缓冲区”中转一下的)。而当我们把一个Heap Buffer写入Channel的时候,实际上底层实现会先构建一个临时的Direct Buffer, 然后把Heap Buffer的内容复制到这个临时的Direct Buffer上,再把这个Direct Buffer写出去。当然,如果我们多次调用write方法,把一个Heap Buffer写入Channel,底层实现可以重复使用临时的Direct Buffer,这样不至于因为频繁地创建和销毁Direct Buffer影响性能。结论:Direct Buffer创建和销毁的代价很高,所以要用在尽可能重用的地方。 比如周期长传输文件大采用direct buffer,不然一般情况下就直接用heap buffer 就好。直接缓冲区不支持数组访问数据,但是我们可以间接的访问数据数组,如下面代码:ByteBuf directBuf = Unpooled.directBuffer(16); if(!directBuf.hasArray()){ int len = directBuf.readableBytes(); byte[] arr = new byte[len]; directBuf.getBytes(0, arr); }
Composite Buffer(复合缓冲区) 复合缓冲区,我们可以创建多个不同的 ByteBuf,然后提供一个这些 ByteBuf组合的视图。
CompositeByteBuf compBuf =Unpooled.compositeBuffer(); ByteBuf heapBuf = Unpooled.buffer(8); ByteBuf directBuf = Unpooled.directBuffer(16);//添加ByteBuf到 CompositeByteBuf compBuf.addComponents(heapBuf,directBuf);//删除第一个ByteBuf compBuf.removeComponent(0); Iterator<ByteBuf> iter = compBuf.iterator(); while(iter.hasNext()){ System.out.println(iter.next().toString()); } //使用数组访问数据 ,compBuf的hasArray只有当componentCount==1时才有意义,大于1时返回falseif(!compBuf.hasArray()){ int len = compBuf.readableBytes(); byte[] arr = new byte[len]; compBuf.getBytes(0, arr); }

### CompositeByteBuf写入的数据当作一个component。readBytes(arr)超出可读范围抛异常。getBytes(0,arr) 时超出cap时异常。readBytes(arr) 时读取的是自身写入的数据,component的读不到,即使add Component在中间添加,对读无影响,超出可读范围抛异常。 除非当compBuf.addComponents(true,heapBuf,directBuf);时改变了写索引,这时可用readBytes(arr)读取,但从component中读到的字节不正确,自身写的数据可正常读。 经测试,最好不要写入数据。getBytes(0,arr) 时先读取自身写入的,再读取component的,可以跨component读取。当compBuf.addComponents(true,heapBuf,directBuf)时,或者在addComponents前写入时才增加cap,但由于此时自身写的当作了component,此component的cap大于实际已写字节。
派生缓冲区
@Testpublic void byteBufOp() { Charset utf8 = Charset.forName("UTF-8"); // 创建一个用于保存给定字符串的字节的ByteBuf ByteBuf buf = Unpooled.copiedBuffer("Netty in Action rocks!", utf8);
ByteBuf duplicate = buf.duplicate();
ByteBuf sliced = buf.slice(0, 15);
ByteBuf readSlice=buf.readSlice(10); // 将打印“Netty in Action” System.out.println(sliced.toString(utf8)); // 更新索引0 处的字节 buf.setByte(0, (byte) 'J'); // 将会成功,因为数据是共享的,对其中一个所做的更改对另外一个也是可见的 assert buf.getByte(0) == sliced.getByte(0);}
真实拷贝
public void byteBufCopy() { Charset utf8 = Charset.forName("UTF-8"); // 创建一个用于保存给定字符串的字节的ByteBuf ByteBuf buf = Unpooled.copiedBuffer("Netty in Action rocks!", utf8); // 创建该ByteBuf 从索引0 开始到索引15结束的分段的副本 ByteBuf copy = buf.copy(0, 15); // 将打印“Netty in Action” System.out.println(copy.toString(utf8)); // 更新索引0 处的字节 buf.setByte(0, (byte) 'J'); // 将会成功,因为数据不是共享的 assert buf.getByte(0) != copy.getByte(0);}
索引访问 注意通过索引访问时不会推进读索引和写索引,我们可以通过 ByteBuf 的readerIndex()或writerIndex()来分别推进读索引或写索引。
/create a ByteBuf of capacity is 16 ByteBuf buf = Unpooled.buffer(16); //write data to buf for(int i=0;i<16;i++){ buf.writeByte(i+1); } //read data from buf for(int i=0;i<buf.capacity();i++){ System.out.println(buf.getByte(i)); }
通过 CompositeByteBuf 实现零拷贝
假设我们有一份协议数据, 它由头部和消息体组成, 而头部和消息体是分别存放在两个 ByteBuf 中的, 即:ByteBuf header = ...ByteBuf body = ...我们在代码处理中, 通常希望将 header 和 body 合并为一个 ByteBuf, 方便处理, 那么通常的做法是:ByteBuf allBuf = Unpooled.buffer(header.readableBytes() + body.readableBytes());allBuf.writeBytes(header);allBuf.writeBytes(body);可以看到, 我们将 header 和 body 都拷贝到了新的 allBuf 中了, 这无形中增加了两次额外的数据拷贝操作了.那么有没有更加高效优雅的方式实现相同的目的呢? 我们来看一下 CompositeByteBuf 是如何实现这样的需求的吧.CompositeByteBuf compositeByteBuf = Unpooled.compositeBuffer();compositeByteBuf.addComponents(true, header, body);上面 CompositeByteBuf 代码一个地方值得注意的是, 我们调用 addComponents(boolean increaseWriterIndex, ByteBuf... buffers) 来添加两个 ByteBuf, 其中第一个参数是 true, 表示当添加新的 ByteBuf 时, 自动递增 CompositeByteBuf 的 writeIndex.如果我们调用的是compositeByteBuf.addComponents(header, body);那么其实 compositeByteBuf 的 writeIndex 仍然是0, 因此此时我们就不可能从 compositeByteBuf 中读取到数据.
通过 wrap 操作实现零拷贝 例如我们有一个 byte 数组, 我们希望将它转换为一个 ByteBuf 对象, 以便于后续的操作, 那么传统的做法是将此 byte 数组拷贝到 ByteBuf 中, 即:
byte[] bytes = ...ByteBuf byteBuf = Unpooled.buffer();byteBuf.writeBytes(bytes);
显然这样的方式也是有一个额外的拷贝操作的, 我们可以使用 Unpooled 的相关方法, 包装这个 byte 数组, 生成一个新的 ByteBuf 实例, 而不需要进行拷贝操作. 上面的代码可以改为:
byte[] bytes = ...ByteBuf byteBuf = Unpooled.wrappedBuffer(bytes);
可以看到, 我们通过 Unpooled.wrappedBuffer 方法来将 bytes 包装成为一个 UnpooledHeapByteBuf 对象, 而在包装的过程中, 是不会有拷贝操作的. 即最后我们生成的生成的 ByteBuf 对象是和 bytes 数组共用了同一个存储空间, 对 bytes 的修改也会反映到 ByteBuf 对象中.
通过 slice 操作实现零拷贝
slice 操作和 wrap 操作刚好相反, Unpooled.wrappedBuffer 可以将多个 ByteBuf 合并为一个, 而 slice 操作可以将一个 ByteBuf 切片 为多个共享一个存储区域的 ByteBuf 对象.ByteBuf 提供了两个 slice 操作方法:public ByteBuf slice();public ByteBuf slice(int index, int length);不带参数的 slice 方法等同于 buf.slice(buf.readerIndex(), buf.readableBytes()) 调用, 即返回 buf 中可读部分的切片. 而 slice(int index, int length) 方法相对就比较灵活了, 我们可以设置不同的参数来获取到 buf 的不同区域的切片.
10】Netty中TCP拆包、粘包问题
在基于流的传输里比如TCP/IP,接收到的数据会先被存储到一个socket接收缓冲里。不幸的是,基于流的传输并不是一个数据包队列,而是一个字节队列。即使你发送了2个独立的数据包,操作系统也不会作为2个消息处理而仅仅是作为一连串的字节。TCP是一个基于流的协议,TCP作为传输层协议并不知道应用层协议的具体含义,它会根据TCP缓冲区的实际情况进行数据包的划分,所以在应用层上认为是一个完整的包,可能会被TCP拆分成多个包进行发送,也有可能把多个小的包封装成一个大的数据包发送,这就是所谓的TCP粘包和半包问题。有如下三种方案:1.消息定长,例如每个报文的大小固定为多少字节,如果不够,就空格补充2.在包尾部增加特殊的字符进行分隔,例如$3.将消息分为消息头和消息体,在消息头中包含表示消息总长度的字段,然后就行业务逻辑的处理
采用定长
new FixedLengthFrameDecoder(10)
采用分隔符
//以$为分隔符 ByteBuf buf = Unpooled.copiedBuffer("$".getBytes()); ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, buf)) // f.channel().writeAndFlush(Unpooled.copiedBuffer("Hello Server$".getBytes()));
Netty自带拆包类
FixedLengthFrameDecoder 按照特定长度组包 DelimiterBasedFrameDecoder 按照指定分隔符组包, 例如本文中的$$$ LineBasedFrameDecoder 按照换行符进行组包, \r \n等等 LengthFieldBasedFrameDecoder:跟据包头部定义的长度来区分包
11】Pipeline
详情参考
Pipeline初始化的过程中增加了head和tail这两个Handler。tail的inbound属性为true, outbound属性为false;head的inbound属性为false,outbound属性为truehead节点的作用主要是将各种事件传播到后续节点,并且其携带unSafe属性用于数据的读写;tail节点的作用是作为处理链的最后一个节点,进行一些收尾操作。调用addLast()往Pipeline中添加ChannelHandler: 1.判断是否重复添加(是否有添加@Sharable注解并且是否已经添加过),添加了@Sharable注解后,就表示这个ChannelHandler可以同时被多个ChannelPipeline添加,而如果没有这个注解,则只能被一个ChannelPipeline添加. 2.创建节点并添加至链表,将当前创建的节点添加为ChannelPipeline这个双向链表的最后一个tail节点的前面一个节点。 3.回调添加完成事件inBound事件的传播( ChannelInboundHandler接口定义的方法): channelRegistered和channelUnregistered方法是Channel注册到NioEventLoop线程中对应的selector选择器上或是从选择器上取消注册的时候触发的回调函数; channelActive和channelInactive方法是通道激活和失效触发的回调函数; channelRead方法是Channel读到数据或者建立连接后的回调函数,channelReadComplete是Channel读取完毕后的回调函数; userEventTriggered方法表示用户事件被触发后的回调函数,channelWritabilityChanged表示Channel读写能力发生变化后的回调函数; 对比ChannelHandler和ChannelInboundHandler这两个接口,可知ChannelInboundHandler添加了一些Channel相关的回调函数。下面就来从channelRead这个事件来作为示例,看下事件是如何在ChannelPipeline中各个节点之间进行传播的。channelRead事件的传播: 执行ctx.channel().pipeline().fireChannelRead("hello world");语句时事件和消息是从pipeline的头部开始传递的, 而执行ctx.fireChannelRead()语句时事件和消息是从当前节点开始向后传递的。 如果read事件最后传递到了tail节点,则会调用ReferenceCountUtil.release(msg)这个方法将保存消息的内存进行释放;如果没有调用fireChannelRead()方法将事件向后传递,也没有处理消息,那么这个内存就永远得不到释放了,这就会导致内存泄露。 在编写inbound类型的ChannalHandler实现类的时候,只要继承这个SimpleChannelInboundHandler类就行,复写channelRead0()方法,并且在这个方法中并不需要显示调用ReferenceCountUtil.release(msg);方法来释放内存,因为在SimpleChannelInboundHandler类的channelRead()方法内部最终都会自己执行释放内存的操作。write事件传播: ctx.channel().write("hello,world"),执行的pipeline对象的write方法,执行的pipeline链表的tail节点的write方法; ctx.write(msg,promise) 向前一个节点传递。
12】为什么Netty会发生内存泄漏问题
Netty之有效规避内存泄漏
如果应用程序在使用ByteBuf后,没有调用方法release()(这个方法将其放回对象池中),又没有任何进一步的引用,则会发生泄漏。 在这种情况下,缓冲区最终将被GC(垃圾回收器)清除,但Netty的对象池不会知道这种情况。 然后,对象池将逐渐相信程序正在使用越来越多的永不返回池中的ByteBuf。这可能会产生内存泄漏,因为ByteBuf被垃圾回收器回收,对象池回收不到它。导致对象池创建越来越多的新的引用计数对象。
ByteBuf 是Netty中主要用来数据byte[]的封装类,主要分为Heap ByteBuf 和 Direct ByteBuf。为了减少内存的分配回收以及产生的内存碎片,Netty提供了PooledByteBufAllocator 用来分配可回收的ByteBuf,可以把PooledByteBufAllocator看做一个池子,需要的时候从里面获取ByteBuf,用完了放回去,以此提高性能。当然与之对应的还有 UnpooledByteBufAllocator,顾名思义Unpooled就是不会放到池子里,所以根据该分配器分配的ByteBuf,不需要放回池子有JVM自己GC回收。在netty中,根据ChannelHandlerContext 和 Channel获取的Allocator默认都是Pooled,所以需要再合适的时机对其进行释放,避免造成内存泄漏。Netty默认会在ChannelPipline的最后添加一个tail handler帮你完成ByteBuf的release。其释放的是channelRead传入的ByteBuf,如果在handlers传递过程中,传递了新值,老值需要你自己手动释放。另外如果中途没有使用fireChannelRead传递下去也要自己释放。在传递过程中自己通过Channel或ChannelHandlerContext创建的但是没有传递下去的ByteBuf也要手动释放。为了帮助你诊断潜在的泄漏问题,netty提供了ResourceLeakDetector,该类会采样应用程序中%1的buffer分配,并进行跟踪。不用担心这个开销很小。Netty目前定义了四中检测级别:DISABLE, SIMPLE(默认),ADVANCED, PARANOID。 禁用(DISABLED) - 完全禁止泄露检测,省点消耗。 简单(SIMPLE) - 默认等级,告诉我们取样的1%的ByteBuf是否发生了泄露,但总共一次只打印一次,看不到就没有了。 高级(ADVANCED) - 告诉我们取样的1%的ByteBuf发生泄露的地方。每种类型的泄漏(创建的地方与访问路径一致)只打印一次。对性能有影响。 偏执(PARANOID) - 跟高级选项类似,但此选项检测所有ByteBuf,而不仅仅是取样的那1%。对性能有绝大的影响。总结: 所谓内存泄漏,主要是针对池化的ByteBuf。ByteBuf对象被JVM GC掉之前,没有调用release()把底下的DirectByteBuffer或byte[]归还到池里,会导致池越来越大。 而非池化的ByteBuf,即使像DirectByteBuf那样可能会用到System.gc(),但终归会被release掉的,不会出大事。
13】Netty SSL性能调优
参考链接
###Java 对SSL的支持JDK7的client端只支持TLS1.0,服务端则支持TLS1.2。 JDK8完全支持TLS1.2。JDK7不支持GCM算法。JDK8支持GCM算法,但性能极差极差极差,按Netty的说法: Java 8u60以前多版本,只能处理1 MB/s。 Java 8u60 开始,10倍的性能提升,10-20 MB/s。 但比起 OpenSSL的 ~200 MB/s,还差着一个量级。
###Netty 对SSL的支持Netty既支持JDK SSL,也支持Google的boringssl, 这是OpenSSL 的一个fork,更少的代码,更多的功能。依赖netty-tcnative-boringssl-static-linux-x86_64.jar即可,它里面已包含了相关的so文件,再也不用管Linux里装没装OpenSSL,OpenSSL啥版本了。
###Netty4,如果使用openssl,需要添加下面两个jar,如果只用jdk 提供的ssl,可以不导入<dependency> <groupId>io.netty</groupId> <artifactId>netty-tcnative</artifactId> <version>2.0.25.Final</version> <scope>runtime</scope></dependency><dependency> <groupId>io.netty</groupId> <artifactId>netty-tcnative-boringssl-static</artifactId> <version>2.0.25.Final</version> <scope>runtime</scope></dependency>
SslContextBuilder.forServer(kmf).trustManager(tf).build(); //不设置.sslProvider(SslProvider.OPENSSL)时,当SslProvider.OPENSSL可用时,优先使用SslProvider.OPENSSL。
###SslProvider.OPENSSL支持的ciphers:0 = "TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384"1 = "TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256"2 = "TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256"3 = "TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA"4 = "TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA"5 = "TLS_RSA_WITH_AES_128_GCM_SHA256"6 = "TLS_RSA_WITH_AES_128_CBC_SHA"7 = "TLS_RSA_WITH_AES_256_CBC_SHA"
###SslProvider.JDK支持的ciphers:0 = "TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256"1 = "TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256"2 = "TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA"3 = "TLS_RSA_WITH_AES_128_GCM_SHA256"4 = "TLS_RSA_WITH_AES_128_CBC_SHA"
JDK SSL 使用
public class SSLContextFactory {
private static SSLContextFactory factory=null; private static SpeechProperties.Ssl ssl=null; private static SSLContext sslContext=null;
private SSLContextFactory(){ }
public static SSLContextFactory getInstance(){ if(factory==null){ synchronized(SSLContextFactory.class){ if(factory==null){ factory=new SSLContextFactory(); ssl= SpringUtils.getBean(SpeechProperties.class).getNetty().getSsl(); } } } return factory; }
private static SSLContext getSSLContext() { if(sslContext==null){ synchronized(SSLContextFactory.class){ if(sslContext==null){ try { sslContext = SSLContext.getInstance(ssl.getProtocol()); KeyStore keyStore = KeyStore.getInstance(ssl.getKeyStoreType()); InputStream inputStream=ssl.getKeyStoreLocation().getInputStream(); keyStore.load(inputStream, ssl.getKeyStorePassword().toCharArray()); inputStream.close(); KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()); kmf.init(keyStore, ssl.getKeyPassword().toCharArray()); KeyManager[] keyManagers = kmf.getKeyManagers(); TrustManager[] trustManagers=null; if(ssl.isNeedClientAuth()){ KeyStore tKeyStore = KeyStore.getInstance(ssl.getTrustStoreType()); InputStream inputStream1=ssl.getTrustStoreLocation().getInputStream(); tKeyStore.load(inputStream1, ssl.getTrustStorePassword().toCharArray()); TrustManagerFactory tf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()); tf.init(tKeyStore); trustManagers=tf.getTrustManagers(); } sslContext.init(keyManagers, trustManagers,new SecureRandom()); return sslContext; } catch (NoSuchAlgorithmException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } catch (CertificateException e) { e.printStackTrace(); } catch (UnrecoverableKeyException e) { e.printStackTrace(); } catch (KeyStoreException e) { e.printStackTrace(); } catch (KeyManagementException e) { e.printStackTrace(); } } } } return sslContext; }
private SSLEngine getSSLEngine() { SSLEngine sslEngine =getSSLContext().createSSLEngine(); sslEngine.setUseClientMode(false); sslEngine.setNeedClientAuth(ssl.isNeedClientAuth()); return sslEngine; }
public SslHandler getSslHandler() { if(ssl.isEnableSSL()){ SSLEngine sslEngine=getSSLEngine(); if(sslEngine!=null){ return new SslHandler(sslEngine); } } return null; }
}
SslHandler sslHandler=SSLContextFactory.getInstance().getSslHandler();if (sslHandler != null) { e.pipeline().addFirst("ssl", sslHandler);}
OPEN SSL 使用

public class SSLContextFactory {
private static SSLContextFactory factory=null; private static SpeechProperties.Ssl ssl=null; private static SslContext sslContext=null;
private SSLContextFactory(){ }
public static SSLContextFactory getInstance(){ if(factory==null){ synchronized(SSLContextFactory.class){ if(factory==null){ factory=new SSLContextFactory(); ssl= SpringUtils.getBean(SpeechProperties.class).getNetty().getSsl(); } } } return factory; }
private static SslContext getSSLContext() { if(sslContext==null){ synchronized(SSLContextFactory.class){ if(sslContext==null){ try { KeyStore keyStore = KeyStore.getInstance(ssl.getKeyStoreType()); InputStream inputStream=ssl.getKeyStoreLocation().getInputStream(); keyStore.load(inputStream, ssl.getKeyStorePassword().toCharArray()); inputStream.close(); KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()); kmf.init(keyStore, ssl.getKeyPassword().toCharArray()); TrustManagerFactory tf=null; if(ssl.isNeedClientAuth()){ KeyStore tKeyStore = KeyStore.getInstance(ssl.getTrustStoreType()); InputStream inputStream1=ssl.getTrustStoreLocation().getInputStream(); tKeyStore.load(inputStream1, ssl.getTrustStorePassword().toCharArray()); tf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()); tf.init(tKeyStore); } sslContext = SslContextBuilder.forServer(kmf).trustManager(tf).build(); return sslContext; } catch (NoSuchAlgorithmException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } catch (CertificateException e) { e.printStackTrace(); } catch (UnrecoverableKeyException e) { e.printStackTrace(); } catch (KeyStoreException e) { e.printStackTrace(); } } } } return sslContext; }
private SSLEngine getSSLEngine(ByteBufAllocator alloc) { SSLEngine sslEngine =getSSLContext().newEngine(alloc); sslEngine.setUseClientMode(false); sslEngine.setNeedClientAuth(ssl.isNeedClientAuth()); return sslEngine; }
public SslHandler getSslHandler(ByteBufAllocator alloc) { if(ssl.isEnableSSL()){ SSLEngine sslEngine=getSSLEngine(alloc); if(sslEngine!=null){ return new SslHandler(sslEngine); } } return null; }}
SslHandler sslHandler=SSLContextFactory.getInstance().getSslHandler(e.alloc());if (sslHandler != null) { e.pipeline().addFirst("ssl", sslHandler);}
14】netty编解码器
HttpObjectAggregator 把多个消息转换为一个单一的FullHttpRequest或是FullHttpResponse。
15】自定义编解码器
ByteToMessageDecoder
abstract ByteToMessageDecoder -> ChannelInboundHandlerAdapter -> abstract ChannelHandlerAdapter -> interface ChannelHandler -> interface ChannelInboundHandler -> interface ChannelHandlerByteToMessageDecoder的channelRead方法读到消息后判断若为ByteBuf类型则调用它的callDecode方法,进而[while (in.isReadable())]调用decodeRemovalReentryProtection方法,最终调用它的实现方法decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)。自定义的解码器在decode方法中将解码后的消息放入list中,由callDecode方法识别到后调用fireChannelRead向下传递(遍历list集合,每次传递一个)。ByteToMessageDecoder 中的 ByteBuf cumulation字段 缓存未解码的数据,等下个数据包过来时拼接后解码。
MessageToByteEncoder
abstract MessageToByteEncoder<I> -> ChannelOutboundHandlerAdapter -> abstract ChannelHandlerAdapter -> interface ChannelHandler -> interface ChannelOutboundHandler -> interface ChannelHandler ctx.writeAndFlush(33),向下传递消息至MessageToByteEncoder的write方法,此时判断类型匹配时(不匹配时直接向下传递)调用它的实现方法encode(ChannelHandlerContext ctx, I msg, ByteBuf out),自定义的实现方法将msg写入out,然后向下传递out.
abstract class MessageToMessageDecoder extends ChannelInboundHandlerAdapter
abstract MessageToMessageDecoder<I> -> ChannelInboundHandlerAdapter -> abstract ChannelHandlerAdapter -> interface ChannelHandler -> interface ChannelInboundHandler -> interface ChannelHandler与ByteToMessageDecoder不同,其实现方法是 decode(ChannelHandlerContext ctx, I msg, List<Object> out)
MessageToMessageEncoder
abstract MessageToMessageEncoder<I> -> ChannelOutboundHandlerAdapter -> abstract ChannelHandlerAdapter -> interface ChannelHandler -> interface ChannelOutboundHandler -> interface ChannelHandler 与MessageToByteEncoder不同,其实现的方法是 encode(ChannelHandlerContext ctx, I msg, List<Object> out)


以上是关于netty channelhandleradapter 是否去了channelread方法的主要内容,如果未能解决你的问题,请参考以下文章

95-36-025-ChannelHandler-ChannelHandlerAdapter

netty 传输数据为啥要序列化

netty系列之:NIO和netty详解

荐书 | Netty进阶之路:跟着案例学Netty

netty可以做啥

Netty 简介《Netty In Action》 #yyds干货盘点#