Netty介绍及使用

Posted zqq_hello_world

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Netty介绍及使用相关的知识,希望对你有一定的参考价值。

Netty

Netty是由JBOSS提供的一个java开源框架,现为 Github上的独立项目。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。

也就是说,Netty 是一个基于NIO的客户、服务器端的编程框架,使用Netty 可以确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户、服务端应用。Netty相当于简化和流线化了网络应用的编程开发过程,例如:基于TCP和UDP的socket服务开发。

Netty提升的是吞吐量(单位时间内处理请求的个数),把线程工作细化。 是对JavaNIO网络的封装,每一步都可以另一个线程去执行,并提供异步监听处理和同步阻塞处理的方法。

第一个Netty程序

  1. 客户端

    /**
     * Hello Netty
     */
    public class Client {
    
        public static void main(String[] args) throws InterruptedException {
            client();
        }
    
        private static void client() throws InterruptedException {
            //1.客户端启动类
            new Bootstrap()
                    //2.添加 EventLoop
                    .group(new NioEventLoopGroup())
                    //3.选择客户端的Channel实习
                    .channel(NiosocketChannel.class)
                    //4.添加处理器
                    .handler(new ChannelInitializer<NioSocketChannel>() {
                        @Override //在连接建立后被调用
                        protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                            //编码设置
                            nioSocketChannel.pipeline().addLast(new StringEncoder(Charset.forName("utf-8"))); 
                        }
                    })
                    //5.连接到服务器
                    .connect(new InetSocketAddress("localhost",8080))
                    .sync() //6.阻塞,直到与服务端连接建立
                    .channel() //拿到连接对象Channel
                    //7.向服务器发送数据
                    .writeAndFlush("Hello Netty");
        }
    }
    
  2. 服务端

    public class Server {
    
        public static void main(String[] args) {
            server();
        }
    
        private static void server() {
            // 1. ServerBootstrap 服务端启动器,组装netty注解,启动服务器
            new ServerBootstrap()
                    //2.绑定NioEventLoopGroup,绑定对应的boss(负责连接) worker(处理事件)组
                    .group(new NioEventLoopGroup())
                    //3.选择服务器ServerSocketChannel实现,NioServerSocketChannel NIO的ServerSocketChannel
                    .channel(NioServerSocketChannel.class)
                    // 4.类似nio添加事件处理器 Handler。new ChannelInitializer初始化通道Channel和添加Handler
                    .childHandler(new ChannelInitializer<NioSocketChannel>() {
                        @Override
                        protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                            //5.添加具体的Handler
                            nioSocketChannel.pipeline().addLast(new StringDecoder(Charset.forName("utf-8")));//将Bytebuf 转换为字符串
                            nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter(){ //自定义Handler
                                //6.处理读事件
                                @Override
                                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                    System.out.println("接收到消息:" + msg);
                                }
                            });
                        }
                    }).bind(8080); //7.服务启动绑定的监听端口
        }
    }
    

Netty组件介绍

EventLoop

EventLoop本质是一个单线程执行器(同时维护了一个Selector),里面有run方法处理Channel上源源不断的IO事件。是一个接口,继承了JDK的ScheduledExecutorService(可执行定时任务的线程池)和 Netty的OrderedEventExecutor。

  • 继承了线程池ScheduledExecutorService,有线程池的所有方法,可以执行定时任务。
  • OrderedEventExecutor提供了判断线程是否属于此EventLoop方法inEventLoop(Thread var1),和parent()方法判断自己属于哪个EventLoopGroup等。

EventLoopGroup(事件循环组)

EventLoopGroup是一组EventLoop,Channel一般会调用EventLoopGroup的register方法来绑定其中一个EventLoop,后续这个Channel上的IO事件都由此EventLoop来处理。

EventLoopGroup处理普通任务、定时任务

/**
 * EventLoopGroup处理普通任务,定时任务
 */
private static void demo1() {
    //1.创建EventLoopGroup  EventLoopGroup 可以处理IO事件,普通任务,定时任务。可以指定线程数,不指定取电脑核心线程数*2,保证会有一个线程
    EventLoopGroup eventLoopGroup = new NioEventLoopGroup(2);
    //2.获取下一个事件,多次调用会依次循环
    System.out.println(eventLoopGroup.next());
    System.out.println(eventLoopGroup.next());
    System.out.println(eventLoopGroup.next());
    System.out.println(eventLoopGroup.next());
    //3.执行普通任务。异步执行
    eventLoopGroup.next().submit(new Runnable() {
        @Override
        public void run() {
            try {
                //睡2秒
                Thread.sleep(2 * 1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("线程名称:" + Thread.currentThread().getName());
        }
    });
    System.out.println("线程名称:" + Thread.currentThread().getName());

    //3.定时任务。
    eventLoopGroup.next().scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            try {
                //睡2秒
                Thread.sleep(2 * 1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("线程名称:" + Thread.currentThread().getName());
        }
    },1,3, TimeUnit.SECONDS); // 第一个参数几秒后执行,第二个参数 几秒执行依次,第三个参数是前面两个的单位,跟线程池ScheduledExecutorService用法一样
}

EventLoopGroup处理IO事件

NioEventLoopGroup一但与Channel建立关系,这个Channel上所有的事件都会由一个EventLoop处理(同一个线程)。

/**
 * EventLoopGroup处理IO事件
 */
private static void demo2() throws Exception{
    new ServerBootstrap()
            //当客户端与服务端建立连接,产生通道channel,这个channel只会绑定同一个EventLoop处理(同一个线程)
            .group(new NioEventLoopGroup())
            .channel(NioServerSocketChannel.class)
            .childHandler(new ChannelInitializer<NioSocketChannel>() {
                @Override
                protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                    nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                        @Override
                        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                            ByteBuf byteBuf = (ByteBuf) msg;
                            System.out.println(byteBuf.toString(Charset.forName("utf-8")));
                        }
                    });
                }
            }).bind(8080);
}

EventLoopGroup根据不同情况划分不同职责

  1. 创建不同EveLoop监听不同事件

    private static void demo2() throws Exception{
        new ServerBootstrap()
                // 两个参数的方法,parentGroup(第一个参数)只负责NioServerSocketChannel上accept事件,childGroup(第二个参数)负责NioSocketChannel上的读写
                .group(new NioEventLoopGroup(),new NioEventLoopGroup())
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                        nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                ByteBuf byteBuf = (ByteBuf) msg;
                                System.out.println(byteBuf.toString(Charset.forName("utf-8")));
                            }
                        });
                    }
                }).bind(8080);
    }
    
  2. 将其它事情交给其它Eveloop处理,比如耗费时间特别长的任务,会占用绑定的EventLoopGroup()的EventLoop

    private static void demo2() throws Exception{
        //创建单独的EventLoopGroup,处理个别情况的任务
        EventLoopGroup group = new DefaultEventLoop();
        new ServerBootstrap()
                .group(new NioEventLoopGroup(),new NioEventLoopGroup())
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                        nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                ByteBuf byteBuf = (ByteBuf) msg;
                                //打印的名称与下面不一样,说明未占用同一线程
                                System.out.println(Thread.currentThread().getName() + "::" + byteBuf.toString(Charset.forName("utf-8")));
                                //将消息传递给下一个handler
                                ctx.fireChannelRead(msg);
                            }
                        });
                        //传入EventLoopGroup用于处理事件
                        nioSocketChannel.pipeline().addLast(group,"handler2",new ChannelInboundHandlerAdapter(){
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                ByteBuf byteBuf = (ByteBuf) msg;
                                //打印的名称与上面不一样,说明未占用同一线程
                                System.out.println(Thread.currentThread().getName() + "::" + byteBuf.toString(Charset.forName("utf-8")));
                            }
                        });
                    }
                }).bind(8080);
    }
    

Channel

可以把Channel看作是传入(入站)或者传出(出站)数据的载体。因此它可以被打开或者被关闭、连接或者断开连接。

Channel的主要方法介绍

  • close():用来关闭channel
  • pipeline():可以用来添加处理器Handler
  • write():写入数据
  • writeAndFlush():写入数据并立即发送给服务端
  • closeFuture():处理Channel的关闭
public static void main(String[] args) throws Exception{
    Channel channel = new Bootstrap()
            .group(new NioEventLoopGroup())
            .channel(NioSocketChannel.class)
            .handler(new ChannelInitializer<NioSocketChannel>() {
                @Override //在连接建立后被调用
                protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                    nioSocketChannel.pipeline().addLast(new StringEncoder(Charset.forName("utf-8"))); //编码设置
                }
            })
            .connect(new InetSocketAddress("localhost",8080))
            .sync()
            .channel();
            //.writeAndFlush("Hello Netty");

    //关闭Channel
    channel.writeAndFlush("Hello Netty");;
}

Future

Future提供了一种在操作完成时通知应用程序的方式。这个对象可以看作是一个异步操作结果的占位符;它在未来的某个时刻完成,并提供对其结果的访问。Netty提供了自己的实现ChannelFuture在执行异步操作时使用

ChannelFuture方法介绍

  • sync():阻塞线程等待。比如客户端跟服务端建立连接用这个方法等待连接建立成功(客户端与服务端是用Eventloop线程异步建立连接)
  • addListener():添加一个监听器异步处理结果

连接主线程阻塞或者异步监听方式代码

/**
 * 阻塞线程同步等待连接和添加监听器异步等待连接后处理信息
 * @throws Exception
 */
private static void demo1() {
    Bootstrap bootstrap  = new Bootstrap()
            .group(new NioEventLoopGroup())
            .channel(NioSocketChannel.class)
            .handler(new ChannelInitializer<NioSocketChannel>() {
                @Override //在连接建立后被调用
                protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                    nioSocketChannel.pipeline().addLast(new StringEncoder(Charset.forName("utf-8")));
                }
            });
    //connet是异步非阻塞,main线程发起了调用,执行去连接服务器的线程是NioEventLoopGroup中的线程
    ChannelFuture future = bootstrap.connect(new InetSocketAddress("localhost",8080));
    //1.上面是异步非阻塞执行,这里需要阻塞等待连接建立成功再往下执行,ChannelFuture的sync()会阻塞等待连接
    /*future.sync();
    Channel channel = future.channel();
    channel.writeAndFlush("Hello Netty");*/

    //2.使用addListener方法异步处理结果(非主线程处理)
    future.addListener(new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture channelFuture) throws Exception {
            Channel channel = channelFuture.channel();
            channel.writeAndFlush("Hello Netty");
            System.out.println("线程名称:" + Thread.currentThread().getName());
        }
    });
}

ChannelFuture处理关闭

Netty的Channel调用关闭方法时也是异步,想正常关闭后处理与上面相同,获取channel的closeFuture(调用closeFuture()方法),然后去阻塞主线程等待关闭或者添加Listener监听,关闭结束后异步处理。Channel关闭过后记得关闭EventLoopGroup(里面有处理事件的线程)。服务端关闭处理与客户端一样

客户端关闭代码

/**
 * ChannelFuture处理关闭
 * @throws Exception
 */
private static void demo2() throws Exception{
    NioEventLoopGroup group = new NioEventLoopGroup();
    Bootstrap bootstrap  = new Bootstrap()
            .group(group)
            .channel(NioSocketChannel.class)
            .handler(new ChannelInitializer<NioSocketChannel>() {
                @Override //在连接建立后被调用
                protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                    nioSocketChannel.pipeline().addLast(new StringEncoder(Charset.forName("utf-8")));
                }
            });
    ChannelFuture future = bootstrap.connect(new InetSocketAddress("localhost",8080));
    future.sync();
    Channel channel = future.channel();
    channel.writeAndFlush("Hello Netty");
    //调用关闭方法,是其他线程去关闭异步的
    channel.close();
    //1.获取Channel的closeFuture,阻塞主线程去等待
    ChannelFuture closeFuture = channel.closeFuture();
    /*
    //1.阻塞主线程去等待关闭
    closeFuture.sync();
     //关闭NioEventLoopGroup
    group.shutdownGracefully();
    System.out.println("已经关闭,做关闭后的事情============");*/

    //2.添加Listener结束后异步处理(其他线程,非主线程)
    closeFuture.addListener(new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture channelFuture以上是关于Netty介绍及使用的主要内容,如果未能解决你的问题,请参考以下文章

Netty框架之概述及基本组件介绍

12.netty中tcp粘包拆包问题及解决方法

12.netty中tcp粘包拆包问题及解决方法

Netty

Netty介绍及应用

Netty 超时机制及心跳程序实现