小六六学Netty系列之初识Netty
Posted 六脉神剑的程序人生
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了小六六学Netty系列之初识Netty相关的知识,希望对你有一定的参考价值。
前言
文本已收录至我的GitHub仓库,欢迎Star:https://github.com/bin392328206/six-finger
种一棵树最好的时间是十年前,其次是现在
我知道很多人不玩qq了,但是怀旧一下,欢迎加入六脉神剑Java菜鸟学习群,群聊号码:549684836 鼓励大家在技术的路上写博客
絮叨
为了学习Netty,我们前面铺垫了那么多,NIO Java的零拷贝,UNIX的I/O模型等等。下面是前面系列的链接
今天我们就来看看Netty 然后用Netty搞个最简单的例子
今天七夕,祝大家有情人终成眷属
Netty官网
官网
Netty官网说明
-
Netty 是由 JBOSS 提供的一个 Java 开源框架。Netty 提供异步的、基于事件驱动的网络应用程序框架,用以快速开发高性能、高可靠性的网络 IO 程序
-
Netty 可以帮助你快速、简单的开发出一个网络应用,相当于简化和流程化了 NIO 的开发过程
-
Netty 是目前最流行的 NIO 框架,Netty 在互联网领域、大数据分布式计算领域、游戏行业、通信行业等获得了广泛的应用,知名的 Elasticsearch 、Dubbo 框架内部都采用了 Netty。
Netty的优点
Netty 对 JDK 自带的 NIO 的 API 进行了封装,解决了上述问题。
-
设计优雅:适用于各种传输类型的统一 API 阻塞和非阻塞 Socket;基于灵活且可扩展的事件模型,可以清晰地分离关注点;高度可定制的线程模型 - 单线程,一个或多个线程池.
-
使用方便:详细记录的 Javadoc,用户指南和示例;没有其他依赖项,JDK 5(Netty 3.x)或 6(Netty 4.x)就足够了。高性能、吞吐量更高:延迟更低;减少资源消耗;最小化不必要的内存复制。
-
安全:完整的 SSL/TLS 和 StartTLS 支持。社区活跃、不断更新:社区活跃,版本迭代周期短,发现的 Bug 可以被及时修复,同时,更多的新功能会被加入
Reactor 模式
针对传统阻塞 I/O 服务模型的 2 个缺点,解决方案:
-
基于 I/O 复用模型:多个连接共用一个阻塞对象,应用程序只需要在一个阻塞对象等待,无需阻塞等待所有连接。当某个连接有新的数据可以处理时,操作系统通知应用程序,线程从阻塞状态返回,开始进行业务处理Reactor 对应的叫法: 1. 反应器模式 2. 分发者模式(Dispatcher) 3. 通知者模式(notifier)
-
基于线程池复用线程资源:不必再为每个连接创建线程,将连接完成后的业务处理任务分配给线程进行处理,一个线程可以处理多个连接的业务。
I/O 复用结合线程池,就是 Reactor 模式基本设计思想,如图:
-
Reactor 模式,通过一个或多个输入同时传递给服务处理器的模式(基于事件驱动) -
服务器端程序处理传入的多个请求,并将它们同步分派到相应的处理线程, 因此Reactor模式也叫 Dispatcher模式 -
Reactor 模式使用IO复用监听事件, 收到事件后,分发给某个线程(进程), 这点就是网络服务器高并发处理关键
Reactor 三种模式
-
单 Reactor 单线程,前台接待员和服务员是同一个人,全程为顾客服 -
单 Reactor 多线程,1 个前台接待员,多个服务员,接待员只负责接待 -
主从 Reactor 多线程,多个前台接待员,多个服务生
Netty模型
工作原理示意图
-
Netty抽象出两组线程池 BossGroup 专门负责接收客户端的连接, WorkerGroup 专门负责网络的读写 BossGroup 和 WorkerGroup 类型都是 NioEventLoopGroup -
NioEventLoopGroup 相当于一个事件循环组, 这个组中含有多个事件循环 ,每一个事件循环是 NioEventLoop -
NioEventLoop 表示一个不断循环的执行处理任务的线程, 每个NioEventLoop 都有一个selector , 用于监听绑定在其上的socket的网络通讯 -
NioEventLoopGroup 可以有多个线程, 即可以含有多个NioEventLoop -
每个Boss NioEventLoop 循环执行的步骤有3步 -
轮询accept 事件 -
处理accept 事件 , 与client建立连接 , 生成NioscocketChannel , 并将其注册到某个worker NIOEventLoop 上的 selector -
处理任务队列的任务 , 即 runAllTasks -
每个 Worker NIOEventLoop 循环执行的步骤 -
轮询read, write 事件 -
处理i/o事件, 即read , write 事件,在对应NioScocketChannel 处理 -
处理任务队列的任务 , 即 runAllTasks -
每个Worker NIOEventLoop 处理业务时,会使用pipeline(管道), pipeline 中包含了 channel , 即通过pipeline 可以获取到对应通道, 管道中维护了很多的 处理器
Netty快速入门实例-TCP服务
-
NettyServer
package com.xiaoliuliu.netty.simple;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
* @author 小六六
* @version 1.0
* @date 2020/8/16 17:41
*/
public class NettyServer {
public static void main(String[] args) {
//创建BossGroup 和 WorkerGroup
//说明
//1. 创建两个线程组 bossGroup 和 workerGroup
//2. bossGroup 只是处理连接请求 , 真正的和客户端业务处理,会交给 workerGroup完成
//3. 两个都是无限循环
//4. bossGroup 和 workerGroup 含有的子线程(NioEventLoop)的个数
// 默认实际 cpu核数 * 2
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
NioEventLoopGroup workerGroup = new NioEventLoopGroup(8);
try {
//创建服务器端的启动对象,配置参数
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)//设置两个线程组
.channel(NioServerSocketChannel.class) //使用NioSocketChannel 作为服务器的通道实现
.option(ChannelOption.SO_BACKLOG,128) //设置保持活动连接状态
// .handler(null) // 该 handler对应 bossGroup , childHandler 对应 workerGroup
.childHandler(new ChannelInitializer<SocketChannel>() {//创建一个通道初始化对象(匿名对象)
@Override
protected void initChannel(SocketChannel ch) throws Exception {
System.out.println("客户socketchannel hashcode=" + ch.hashCode()); //可以使用一个集合管理 SocketChannel, 再推送消息时,可以将业务加入到各个channel 对应的 NIOEventLoop 的 taskQueue 或者 scheduleTaskQueue
ch.pipeline().addLast(new NettyServerHandler());
}
});
System.out.println(".....服务器 is ready...");
//绑定一个端口并且同步, 生成了一个 ChannelFuture 对象
//启动服务器(并绑定端口)
//基于异步操作
ChannelFuture cf = bootstrap.bind(6666).sync();
//给cf 注册监听器,监控我们关心的事件
cf.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (cf.isSuccess()) {
System.out.println("监听端口 6666 成功");
} else {
System.out.println("监听端口 6666 失败");
}
}
});
//对关闭通道进行监听
cf.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
-
NettyServerHandler
package com.xiaoliuliu.netty.simple;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelPipeline;
import io.netty.util.CharsetUtil;
/**
* @author 小六六
* @version 1.0
* @date 2020/8/16 18:04
*
*
* 说明
* 1. 我们自定义一个Handler 需要继续netty 规定好的某个HandlerAdapter(规范)
* 2. 这时我们自定义一个Handler , 才能称为一个handler
*
*/
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
//读取数据实际(这里我们可以读取客户端发送的消息)
/*
1. ChannelHandlerContext ctx:上下文对象, 含有 管道pipeline , 通道channel, 地址
2. Object msg: 就是客户端发送的数据 默认Object
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
/*
//比如这里我们有一个非常耗时长的业务-> 异步执行 -> 提交该channel 对应的
//NIOEventLoop 的 taskQueue中,
//解决方案1 用户程序自定义的普通任务
ctx.channel().eventLoop().execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(5 * 1000);
ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~(>^ω^<)喵2", CharsetUtil.UTF_8));
System.out.println("channel code=" + ctx.channel().hashCode());
} catch (Exception ex) {
System.out.println("发生异常" + ex.getMessage());
}
}
});
ctx.channel().eventLoop().execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(5 * 1000);
ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~(>^ω^<)喵3", CharsetUtil.UTF_8));
System.out.println("channel code=" + ctx.channel().hashCode());
} catch (Exception ex) {
System.out.println("发生异常" + ex.getMessage());
}
}
});
//解决方案2 : 用户自定义定时任务 -》 该任务是提交到 scheduleTaskQueue中
ctx.channel().eventLoop().schedule(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(5 * 1000);
ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~(>^ω^<)喵4", CharsetUtil.UTF_8));
System.out.println("channel code=" + ctx.channel().hashCode());
} catch (Exception ex) {
System.out.println("发生异常" + ex.getMessage());
}
}
}, 5, TimeUnit.SECONDS);
System.out.println("go on ...");*/
System.out.println("服务器读取线程 " + Thread.currentThread().getName() + " channle =" + ctx.channel());
System.out.println("server ctx =" + ctx);
System.out.println("看看channel 和 pipeline的关系");
Channel channel = ctx.channel();
ChannelPipeline pipeline = channel.pipeline(); //本质是一个双向链接, 出站入站
ByteBuf byteBuf=(ByteBuf)msg;
System.out.println("客户端发送的数据是"+byteBuf.toString(CharsetUtil.UTF_8));
System.out.println("客户端地址:" + channel.remoteAddress());
}
//数据读取完毕
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
//writeAndFlush 是 write + flush
//将数据写入到缓存,并刷新
//一般讲,我们对这个发送的数据进行编码
ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~(>^ω^<)喵1", CharsetUtil.UTF_8));
}
//处理异常,关闭通道
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
-
NettyClient
package com.xiaoliuliu.netty.simple;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
/**
* @author 小六六
* @version 1.0
* @date 2020/8/16 18:13
*/
public class NettyClient {
public static void main(String[] args) {
//客户端需要一个事件循环组
EventLoopGroup group = new NioEventLoopGroup();
try {
//创建客户端启动对象
//注意客户端使用的不是 ServerBootstrap 而是 Bootstrap
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)//设置线程组
.channel(NioSocketChannel.class)// 设置客户端通道的实现类(反射)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new NettyClientHandler());
}
});
System.out.println("客户端 ok..");
//启动客户端去连接服务器端
//关于 ChannelFuture 要分析,涉及到netty的异步模型
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6666).sync();
//给关闭通道进行监听
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
}finally {
group.shutdownGracefully();
}
}
}
-
NettyClientHandler
package com.xiaoliuliu.netty.simple;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
/**
* @author 小六六
* @version 1.0
* @date 2020/8/16 18:23
*/
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
//当通道就绪就会触发该方法
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("client " + ctx);
ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 小六六: (>^ω^<)喵", CharsetUtil.UTF_8));
}
//当通道有读取事件时,会触发
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println("服务器回复的消息:" + buf.toString(CharsetUtil.UTF_8));
System.out.println("服务器的地址:"+ ctx.channel().remoteAddress());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
-
结果
结尾
总结一下
-
Netty 抽象出两组线程池,BossGroup 专门负责接收客户端连接,WorkerGroup 专门负责网络读写操作。 -
NioEventLoop 表示一个不断循环执行处理任务的线程,每个 NioEventLoop 都有一个 selector,用于监听绑定在其上的 socket 网络通道。 -
NioEventLoop 内部采用串行化设计,从消息的读取->解码->处理->编码->发送,始终由 IO 线程 NioEventLoop 负责 -
NioEventLoopGroup 下包含多个 NioEventLoop -
每个 NioEventLoop 中包含有一个 Selector,一个 taskQueue -
每个 NioEventLoop 的 Selector 上可以注册监听多个 NioChannel -
每个 NioChannel 只会绑定在唯一的 NioEventLoop 上 -
每个 NioChannel 都绑定有一个自己的 ChannelPipeline
日常求赞
好了各位,以上就是这篇文章的全部内容了,能看到这里的人呀,都是真粉。
创作不易,各位的支持和认可,就是我创作的最大动力,我们下篇文章见
六脉神剑 | 文 【原创】如果本篇博客有任何错误,请批评指教,不胜感激 !
以上是关于小六六学Netty系列之初识Netty的主要内容,如果未能解决你的问题,请参考以下文章