Netty-入门
Posted 善良的吉他手
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Netty-入门相关的知识,希望对你有一定的参考价值。
Hello World
目标
开发一个简单的服务器端和客户端
- 客户端向服务器端发送 hello, world
- 服务器仅接收,不返回
依赖
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.39.Final</version>
</dependency>
服务器端
@Slf4j
public class HelloServer {
public static void main(String[] args) {
new ServerBootstrap()
// 1. 创建 NioEventLoopGroup
.group(new NioEventLoopGroup())
// 2. 选择 Socket 实现类,NioserverSocketChannel 是基于 nio 实现的
.channel(NioServerSocketChannel.class)
// 3. 子线程处理类 handler
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
// 5. socketChannel 解码器
nioSocketChannel.pipeline().addLast(new StringDecoder());
// 6. SocketChannel 的业务处理,使用上一个处理器的处理结果
nioSocketChannel.pipeline().addLast(new SimpleChannelInboundHandler<String>() {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception {
log.info("msg = " + msg);
}
});
}
})
// 4. 绑定监听端口
.bind(8080);
}
}
代码解读
- 1 处,创建 NioEventLoopGroup,可以简单理解为 线程池 + Selector 后面会详细展开
- 2 处,选择服务 Scoket 实现类,其中 NioServerSocketChannel 表示基于 NIO 的服务器端实现
- 3 处,为啥方法叫 childHandler,是接下来添加的处理器都是给 SocketChannel 用的,而不是给 ServerSocketChannel。ChannelInitializer 处理器(仅执行一次),它的作用是待客户端 SocketChannel 建立连接后,执行 initChannel 以便添加更多的处理器
- 4 处,ServerSocketChannel 绑定的监听端口
- 5 处,SocketChannel 的处理器,解码 ByteBuf => String
- 6 处,SocketChannel 的业务处理器,使用上一个处理器的处理结果
客户端
@Slf4j
public class HelloClient {
public static void main(String[] args) throws InterruptedException {
// 1. 启动类
new Bootstrap()
// 2. 添加 EventLoop
.group(new NioEventLoopGroup())
// 3. 选择客户端 channel 实现
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
// 8
nioSocketChannel.pipeline().addLast(new StringEncoder());
}
})
// 4. 连接到服务器
.connect(new InetSocketAddress("localhost",8080))
// 5
.sync()
// 6
.channel()
// 7. 向服务器发送数据
.writeAndFlush("hello word");
}
}
代码解读
- 1 处,创建 NioEventLoopGroup,同 Server
- 2 处,选择客户 Socket 实现类,NioSocketChannel 表示基于 NIO 的客户端实现
- 3 处,添加 SocketChannel 的处理器,ChannelInitializer 处理器(仅执行一次),它的作用是待客户端 SocketChannel 建立连接后,执行 initChannel 以便添加更多的处理器
- 4 处,指定要连接的服务器和端口
- 5 处,Netty 中很多方法都是异步的,如 connect,这时需要使用 sync 方法等待 connect 建立连接完毕
- 6 处,获取 channel 对象,它即为通道抽象,可以进行数据读写操作
- 7 处,写入消息并清空缓冲区
- 8 处,消息会经过通道 handler 处理,这里是将 String => ByteBuf 发出
- 数据经过网络传输,到达服务器端,服务器端 5 和 6 处的 handler 先后被触发,走完一个流程
组件
EventLoop
事件循环对象
EventLoop 本质是一个单线程执行器(同时维护了一个 Selector),里面有 run 方法处理 Channel 上源源不断的 io 事件。
它的继承关系比较复杂
- 一条线是继承自 j.u.c.ScheduledExecutorService 因此包含了线程池中所有的方法
- 另一条线是继承自 netty 自己的 OrderedEventExecutor,
- 提供了 boolean inEventLoop(Thread thread) 方法判断一个线程是否属于此 EventLoop
- 提供了 parent 方法来看看自己属于哪个 EventLoopGroup
事件循环组
EventLoopGroup 是一组 EventLoop,Channel 一般会调用 EventLoopGroup 的 register 方法来绑定其中一个 EventLoop,后续这个 Channel 上的 io 事件都由此 EventLoop 来处理(保证了 io 事件处理时的线程安全)
- 继承自 netty 自己的 EventExecutorGroup
- 实现了 Iterable 接口提供遍历 EventLoop 的能力
- 另有 next 方法获取集合中下一个 EventLoop
@Slf4j
public class EventLoopServer {
public static void main(String[] args) {
/**
* 细分1:
* boss 只负责 ServerSocketChannel 上的 accept 事件
* worker 只负责 SocketChannel 上的读写事件
*/
EventLoopGroup boss = new NioEventLoopGroup();
EventLoopGroup worker = new NioEventLoopGroup();
/**
* 细分2:
* 创建一个独立的 EventLoopGroup
*/
EventLoopGroup dfGroup = new DefaultEventLoopGroup();
new ServerBootstrap()
.group(boss,worker)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel sc) throws Exception {
sc.pipeline().addLast("handler1",new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
log.info(buf.toString(Charset.defaultCharset()));
ctx.fireChannelRead(msg); // 把消息传递给下一个 handler
}
}).addLast(dfGroup,"handler2",new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf)msg;
log.info(buf.toString(Charset.defaultCharset()));
}
});
}
})
.bind(8085);
}
}
@Slf4j
public class EventLoopClient {
public static void main(String[] args) throws InterruptedException {
Channel channel = new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel sc) throws Exception {
sc.pipeline().addLast(new StringEncoder());
}
})
.connect(new InetSocketAddress("localhost", 8085))
.sync()
.channel();
log.info(channel.toString());
System.out.println();
}
}
优雅关闭
优雅关闭 shutdownGracefully 方法。该方法会首先切换 EventLoopGroup 到关闭状态从而拒绝新的任务的加入,然后在任务队列的任务都处理完成后,停止线程的运行。从而确保整体应用是在正常有序的状态下退出的
Channel
channel 的主要作用
- close() 可以用来关闭 channel
- closeFuture() 用来处理 channel 的关闭
- sync 方法作用是同步等待 channel 关闭
- 而 addListener 方法是异步等待 channel 关闭
- pipeline() 方法添加处理器
- write() 方法将数据写入
- writeAndFlush() 方法将数据写入并刷出
ChannelFuture
@Slf4j
public class EventLoopServer {
public static void main(String[] args) {
EventLoopGroup boss = new NioEventLoopGroup();
EventLoopGroup worker = new NioEventLoopGroup();
new ServerBootstrap()
.group(boss,worker)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel sc) throws Exception {
sc.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
log.info(buf.toString(Charset.defaultCharset()));
}
});
}
})
.bind(8086);
}
}
@Slf4j
public class EventLoopClient {
public static void main(String[] args) throws InterruptedException {
// 2. 带有 Future ,Promise 的类型都是和异步方法配套使用,用来处理结果
ChannelFuture channelFuture = new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel sc) throws Exception {
sc.pipeline().addLast(new StringEncoder());
}
})
// 1. 连接到服务器
// 异步阻塞,main 线程发起了调用,真正执行 connect 连接的是 nio 线程
.connect(new InetSocketAddress("localhost", 8086));
// 2.1 使用 sync 方法同步处理结果
// channelFuture.sync(); // 阻塞住当前线程,直到 nio 线程连接建立完毕
// Channel channel = channelFuture.channel();
// log.info(channel.toString());
// channel.writeAndFlush("hello word;");
// 2.2 使用 addListener(回调对象)方法异步处理结果
channelFuture.addListener(new ChannelFutureListener() {
// 在 nio 线程连接建立好之后,会调用 operationComplete
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
Channel channel = channelFuture.channel();
log.info(channel.toString());
channel.writeAndFlush("hello word");
}
});
}
}
CloseFuture
@Slf4j
public class EventLoopClient2 {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup group = new NioEventLoopGroup();
ChannelFuture channelFuture = new Bootstrap()
.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new LoggingHandler(LogLevel.INFO));
socketChannel.pipeline().addLast(new StringEncoder());
}
})
.connect(new InetSocketAddress("localhost", 8086));
Channel channel = channelFuture.sync().channel();
new Thread(() -> {
Scanner scanner = new Scanner(System.in);
while (true) {
String str = scanner.nextLine();
if("q".equals(str)){
channel.close();
break;
}
channel.writeAndFlush(str);
}
},"input").start();
// 1. 获取 CloseFuture 对象同步处理关闭
ChannelFuture closeFuture = channel.closeFuture();
// closeFuture.sync(); // 阻塞
// log.info("处理关闭后的操作...");
// 2. 异步处理关闭
closeFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
log.info("处理关闭之后的操作...");
// 优雅的停止接受任务
group.shutdownGracefully();
}
});
}
}
以上是关于Netty-入门的主要内容,如果未能解决你的问题,请参考以下文章