netty的IO模型
Posted IWAN学编程
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了netty的IO模型相关的知识,希望对你有一定的参考价值。
单Reactor多线程模型相比单Reactor单线程可以较大程度利用CPU的性能, 但是多线程的编程模型较为复杂. 在大并发的场景下, 单线程的事件监听和响应会出现性能瓶颈.
请求解析:
Reactor主线程通过select监听连接事件, 然后通过Acceptor处理连接事件.
Acceptor处理连接事件之后, 主Reactor把连接分配给从Reactor.
从Reactor创建handler进行各种事件处理.
handler通过read读取数据, 分配给后面的worker线程处理.
woker线程池完成业务处理后, 返回处理结果.
通过上述对三种reactor模型的介绍, 我们对Java NIO的三种编程模型已经较为了解. Netty是基于NIO主从Reactor模型做的进一步改进. 下面是Netty的IO模式图.
4. Netty的IO模型
Netty代码示例(discard)
package com.iwan.netty.discard;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
/**
* 服务端
*/
public final class DiscardServer {
static final boolean SSL = System.getProperty("ssl") != null;
static final int PORT = Integer.parseInt(System.getProperty("port", "8009"));
public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
public void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
p.addLast(new DiscardServerHandler());
}
});
ChannelFuture f = b.bind(PORT).sync();
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
}
package com.iwan.netty.discard;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
/**
* 自定义的handler
*/
public class DiscardServerHandler extends SimpleChannelInboundHandler<Object> {
public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
/**
* 什么也不做
*/
}
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
package com.iwan.netty.discard;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
/**
* 客户端
*/
public final class DiscardClient {
static final String HOST = System.getProperty("host", "127.0.0.1");
static final int PORT = Integer.parseInt(System.getProperty("port", "8009"));
static final int SIZE = Integer.parseInt(System.getProperty("size", "256"));
public static void main(String[] args) throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new DiscardClientHandler());
}
});
ChannelFuture f = b.connect(HOST, PORT).sync();
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
}
package com.iwan.netty.discard;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
/**
* 自定义handler
*/
public class DiscardClientHandler extends SimpleChannelInboundHandler<Object> {
private ByteBuf content;
private ChannelHandlerContext ctx;
public void channelActive(ChannelHandlerContext ctx) {
this.ctx = ctx;
content = ctx.alloc().directBuffer(DiscardClient.SIZE).writeZero(DiscardClient.SIZE);
generateTraffic();
}
public void channelInactive(ChannelHandlerContext ctx) {
content.release();
}
public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
}
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
long counter;
private void generateTraffic() {
ctx.writeAndFlush(content.retainedDuplicate()).addListener(trafficGenerator);
}
private final ChannelFutureListener trafficGenerator = future -> {
if (future.isSuccess()) {
generateTraffic();
} else {
future.cause().printStackTrace();
future.channel().close();
}
};
}
以上是关于netty的IO模型的主要内容,如果未能解决你的问题,请参考以下文章