Netty介绍及使用
Posted zqq_hello_world
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Netty介绍及使用相关的知识,希望对你有一定的参考价值。
Netty
Netty是由JBOSS提供的一个java开源框架,现为 Github上的独立项目。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。
也就是说,Netty 是一个基于NIO的客户、服务器端的编程框架,使用Netty 可以确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户、服务端应用。Netty相当于简化和流线化了网络应用的编程开发过程,例如:基于TCP和UDP的socket服务开发。
Netty提升的是吞吐量(单位时间内处理请求的个数),把线程工作细化。 是对JavaNIO网络的封装,每一步都可以另一个线程去执行,并提供异步监听处理和同步阻塞处理的方法。
第一个Netty程序
-
客户端
/** * Hello Netty */ public class Client { public static void main(String[] args) throws InterruptedException { client(); } private static void client() throws InterruptedException { //1.客户端启动类 new Bootstrap() //2.添加 EventLoop .group(new NioEventLoopGroup()) //3.选择客户端的Channel实习 .channel(NiosocketChannel.class) //4.添加处理器 .handler(new ChannelInitializer<NioSocketChannel>() { @Override //在连接建立后被调用 protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception { //编码设置 nioSocketChannel.pipeline().addLast(new StringEncoder(Charset.forName("utf-8"))); } }) //5.连接到服务器 .connect(new InetSocketAddress("localhost",8080)) .sync() //6.阻塞,直到与服务端连接建立 .channel() //拿到连接对象Channel //7.向服务器发送数据 .writeAndFlush("Hello Netty"); } }
-
服务端
public class Server { public static void main(String[] args) { server(); } private static void server() { // 1. ServerBootstrap 服务端启动器,组装netty注解,启动服务器 new ServerBootstrap() //2.绑定NioEventLoopGroup,绑定对应的boss(负责连接) worker(处理事件)组 .group(new NioEventLoopGroup()) //3.选择服务器ServerSocketChannel实现,NioServerSocketChannel NIO的ServerSocketChannel .channel(NioServerSocketChannel.class) // 4.类似nio添加事件处理器 Handler。new ChannelInitializer初始化通道Channel和添加Handler .childHandler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception { //5.添加具体的Handler nioSocketChannel.pipeline().addLast(new StringDecoder(Charset.forName("utf-8")));//将Bytebuf 转换为字符串 nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter(){ //自定义Handler //6.处理读事件 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("接收到消息:" + msg); } }); } }).bind(8080); //7.服务启动绑定的监听端口 } }
Netty组件介绍
EventLoop
EventLoop本质是一个单线程执行器(同时维护了一个Selector),里面有run方法处理Channel上源源不断的IO事件。是一个接口,继承了JDK的ScheduledExecutorService(可执行定时任务的线程池)和 Netty的OrderedEventExecutor。
- 继承了线程池ScheduledExecutorService,有线程池的所有方法,可以执行定时任务。
- OrderedEventExecutor提供了判断线程是否属于此EventLoop方法inEventLoop(Thread var1),和parent()方法判断自己属于哪个EventLoopGroup等。
EventLoopGroup(事件循环组)
EventLoopGroup是一组EventLoop,Channel一般会调用EventLoopGroup的register方法来绑定其中一个EventLoop,后续这个Channel上的IO事件都由此EventLoop来处理。
EventLoopGroup处理普通任务、定时任务
/**
* EventLoopGroup处理普通任务,定时任务
*/
private static void demo1() {
//1.创建EventLoopGroup EventLoopGroup 可以处理IO事件,普通任务,定时任务。可以指定线程数,不指定取电脑核心线程数*2,保证会有一个线程
EventLoopGroup eventLoopGroup = new NioEventLoopGroup(2);
//2.获取下一个事件,多次调用会依次循环
System.out.println(eventLoopGroup.next());
System.out.println(eventLoopGroup.next());
System.out.println(eventLoopGroup.next());
System.out.println(eventLoopGroup.next());
//3.执行普通任务。异步执行
eventLoopGroup.next().submit(new Runnable() {
@Override
public void run() {
try {
//睡2秒
Thread.sleep(2 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("线程名称:" + Thread.currentThread().getName());
}
});
System.out.println("线程名称:" + Thread.currentThread().getName());
//3.定时任务。
eventLoopGroup.next().scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
//睡2秒
Thread.sleep(2 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("线程名称:" + Thread.currentThread().getName());
}
},1,3, TimeUnit.SECONDS); // 第一个参数几秒后执行,第二个参数 几秒执行依次,第三个参数是前面两个的单位,跟线程池ScheduledExecutorService用法一样
}
EventLoopGroup处理IO事件
NioEventLoopGroup一但与Channel建立关系,这个Channel上所有的事件都会由一个EventLoop处理(同一个线程)。
/**
* EventLoopGroup处理IO事件
*/
private static void demo2() throws Exception{
new ServerBootstrap()
//当客户端与服务端建立连接,产生通道channel,这个channel只会绑定同一个EventLoop处理(同一个线程)
.group(new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println(byteBuf.toString(Charset.forName("utf-8")));
}
});
}
}).bind(8080);
}
EventLoopGroup根据不同情况划分不同职责
-
创建不同EveLoop监听不同事件
private static void demo2() throws Exception{ new ServerBootstrap() // 两个参数的方法,parentGroup(第一个参数)只负责NioServerSocketChannel上accept事件,childGroup(第二个参数)负责NioSocketChannel上的读写 .group(new NioEventLoopGroup(),new NioEventLoopGroup()) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception { nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter(){ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf byteBuf = (ByteBuf) msg; System.out.println(byteBuf.toString(Charset.forName("utf-8"))); } }); } }).bind(8080); }
-
将其它事情交给其它Eveloop处理,比如耗费时间特别长的任务,会占用绑定的EventLoopGroup()的EventLoop
private static void demo2() throws Exception{ //创建单独的EventLoopGroup,处理个别情况的任务 EventLoopGroup group = new DefaultEventLoop(); new ServerBootstrap() .group(new NioEventLoopGroup(),new NioEventLoopGroup()) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception { nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter(){ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf byteBuf = (ByteBuf) msg; //打印的名称与下面不一样,说明未占用同一线程 System.out.println(Thread.currentThread().getName() + "::" + byteBuf.toString(Charset.forName("utf-8"))); //将消息传递给下一个handler ctx.fireChannelRead(msg); } }); //传入EventLoopGroup用于处理事件 nioSocketChannel.pipeline().addLast(group,"handler2",new ChannelInboundHandlerAdapter(){ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf byteBuf = (ByteBuf) msg; //打印的名称与上面不一样,说明未占用同一线程 System.out.println(Thread.currentThread().getName() + "::" + byteBuf.toString(Charset.forName("utf-8"))); } }); } }).bind(8080); }
Channel
可以把Channel看作是传入(入站)或者传出(出站)数据的载体。因此它可以被打开或者被关闭、连接或者断开连接。
Channel的主要方法介绍:
- close():用来关闭channel
- pipeline():可以用来添加处理器Handler
- write():写入数据
- writeAndFlush():写入数据并立即发送给服务端
- closeFuture():处理Channel的关闭
public static void main(String[] args) throws Exception{
Channel channel = new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override //在连接建立后被调用
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
nioSocketChannel.pipeline().addLast(new StringEncoder(Charset.forName("utf-8"))); //编码设置
}
})
.connect(new InetSocketAddress("localhost",8080))
.sync()
.channel();
//.writeAndFlush("Hello Netty");
//关闭Channel
channel.writeAndFlush("Hello Netty");;
}
Future
Future提供了一种在操作完成时通知应用程序的方式。这个对象可以看作是一个异步操作结果的占位符;它在未来的某个时刻完成,并提供对其结果的访问。Netty提供了自己的实现ChannelFuture在执行异步操作时使用
ChannelFuture方法介绍
- sync():阻塞线程等待。比如客户端跟服务端建立连接用这个方法等待连接建立成功(客户端与服务端是用Eventloop线程异步建立连接)
- addListener():添加一个监听器异步处理结果
连接主线程阻塞或者异步监听方式代码
/**
* 阻塞线程同步等待连接和添加监听器异步等待连接后处理信息
* @throws Exception
*/
private static void demo1() {
Bootstrap bootstrap = new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override //在连接建立后被调用
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
nioSocketChannel.pipeline().addLast(new StringEncoder(Charset.forName("utf-8")));
}
});
//connet是异步非阻塞,main线程发起了调用,执行去连接服务器的线程是NioEventLoopGroup中的线程
ChannelFuture future = bootstrap.connect(new InetSocketAddress("localhost",8080));
//1.上面是异步非阻塞执行,这里需要阻塞等待连接建立成功再往下执行,ChannelFuture的sync()会阻塞等待连接
/*future.sync();
Channel channel = future.channel();
channel.writeAndFlush("Hello Netty");*/
//2.使用addListener方法异步处理结果(非主线程处理)
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
Channel channel = channelFuture.channel();
channel.writeAndFlush("Hello Netty");
System.out.println("线程名称:" + Thread.currentThread().getName());
}
});
}
ChannelFuture处理关闭
Netty的Channel调用关闭方法时也是异步,想正常关闭后处理与上面相同,获取channel的closeFuture(调用closeFuture()方法),然后去阻塞主线程等待关闭或者添加Listener监听,关闭结束后异步处理。Channel关闭过后记得关闭EventLoopGroup(里面有处理事件的线程)。服务端关闭处理与客户端一样。
客户端关闭代码
/**
* ChannelFuture处理关闭
* @throws Exception
*/
private static void demo2() throws Exception{
NioEventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap()
.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override //在连接建立后被调用
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
nioSocketChannel.pipeline().addLast(new StringEncoder(Charset.forName("utf-8")));
}
});
ChannelFuture future = bootstrap.connect(new InetSocketAddress("localhost",8080));
future.sync();
Channel channel = future.channel();
channel.writeAndFlush("Hello Netty");
//调用关闭方法,是其他线程去关闭异步的
channel.close();
//1.获取Channel的closeFuture,阻塞主线程去等待
ChannelFuture closeFuture = channel.closeFuture();
/*
//1.阻塞主线程去等待关闭
closeFuture.sync();
//关闭NioEventLoopGroup
group.shutdownGracefully();
System.out.println("已经关闭,做关闭后的事情============");*/
//2.添加Listener结束后异步处理(其他线程,非主线程)
closeFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture以上是关于Netty介绍及使用的主要内容,如果未能解决你的问题,请参考以下文章