Netty

Posted tractors

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Netty相关的知识,希望对你有一定的参考价值。

一、Netty概念:

  Netty是一个NIO客户端服务器框架,可以快速轻松地开发协议服务器和客户端等网络应用程序。它极大地简化并简化了TCP和UDP套接字服务器等网络编程。

  “快速简便”并不意味着最终的应用程序会受到可维护性或性能问题的影响。

  Netty经过精心设计,具有丰富的协议,如FTP,SMTP,HTTP以及各种二进制和基于文本的传统协议。

  因此,Netty成功地找到了一种在不妥协的情况下实现易于开发,性能,稳定性和灵活性的方法。

二、关键字:

(1)Bootstrap:

  Netty应用程序通过设置bootstrap(引导)类开始,该类提供了一个用于网络成配置的容器。

  • 一种是用于客户端的Bootstrap
  • 一种是用于服务端的ServerBootstrap

(2)channel:

  Netty中的channel定义了丰富的和socket交互的操作方法:bind, close, config, connect, isActive, isOpen, isWritable, read, write 等等。

  Netty 提供大量的 Channel 实现来针对不同方式的处理。

  这些包括 AbstractChannel,AbstractNioByteChannel,AbstractNioChannel,EmbeddedChannel, LocalServerChannel,NiosocketChannel 等

(2)ChannelHandler:

  用户处理入站和出站的数据。

  常用的一个接口是 ChannelInboundHandler,这个类型接收到入站事件(包括接收到的数据)可以处理应用程序逻辑。

  当你需要提供响应时,你也可以从 ChannelInboundHandler 冲刷数据。

  业务逻辑经常存活于一个或者多个 ChannelInboundHandler。

(3)ChannelPipeline:

  ChannelPipeline 提供了一个容器给 ChannelHandler 链并提供了一个API 用于管理沿着链入站和出站事件的流动。

  每个 Channel 都有自己的ChannelPipeline,在 Channel 创建时自动创建的。

(4)ChannelFuture:

  用于监听Netty的处理结果,并根据结果进行后续处理。

(5)EventLoop:

  EventLoop用于处理Channel的 I/O 操作,一个单一的EventLoop通常处理多个Channel事件,一个EventLoopGroup包含一个或多个EventLoop。

三、socket示例:

 1)服务端:

public class ServerDemo 
    public static void main(String[] args)
        new ServerDemo().start(8888);
    

    public void start(int port) 
        /**
         * 配置服务端的 NIO 线程池,用于网络事件处理,实质上他们就是 Reactor 线程组
         * bossGroup 用于服务端接受客户端连接,workerGroup 用于进行 SocketChannel 网络读写
         */
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        ChannelFuture f = null;
        try 
            /**
             * ServerBootstrap 是 Netty 用于启动 NIO 服务端的辅助启动类,用于降低开发难度
             */
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .childHandler(new ServerInitilzer());
            /**服务器启动辅助类配置完成后,调用 bind 方法绑定监听端口,调用 sync 方法同步等待绑定操作完成*/
            f = b.bind(port).sync();
            System.out.println(Thread.currentThread().getName() + ",服务器开始监听端口,等待客户端连接.........");

            /**
             * 下面会进行阻塞,等待服务器连接关闭之后 main 方法退出,程序结束
             * */
            f.channel().closeFuture().sync();
         catch (
                Exception e) 
            e.printStackTrace();
         finally 
            /**优雅退出,释放线程池资源*/
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        
    
public class ServerInitilzer extends ChannelInitializer<SocketChannel> 
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception 
        socketChannel.pipeline().addLast(new ServerHandler());
    
public class ServerHandler extends ChannelInboundHandlerAdapter 
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception 
        /**
         * 将 msg 转为 Netty 的 ByteBuf 对象,类似 JDK 中的 java.nio.ByteBuffer,不过 ButeBuf 功能更强,更灵活
         */
        ByteBuf buf = (ByteBuf) msg;

        /**
         * readableBytes:获取缓冲区可读字节数,然后创建字节数组
         * 从而避免了像 java.nio.ByteBuffer 时,只能盲目的创建特定大小的字节数组,比如 1024
         * */
        byte[] reg = new byte[buf.readableBytes()];

        /**r
         * eadBytes:将缓冲区字节数组复制到新建的 byte 数组中
         * 然后将字节数组转为字符串
         * */
        buf.readBytes(reg);
        String body = new String(reg, "UTF-8");
        System.out.println(Thread.currentThread().getName() + ",The msg: " + body);

        /**
         * 回复消息
         * copiedBuffer:创建一个新的缓冲区,内容为里面的参数
         * 通过 ChannelHandlerContext 的 write 方法将消息异步发送给客户端
         * */
        String respMsg = "I am Server,消息接收 success!";
        ByteBuf respByteBuf = Unpooled.copiedBuffer(respMsg.getBytes());
        ctx.writeAndFlush(respByteBuf);
    

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception 
        /**当发生异常时,关闭 ChannelHandlerContext,释放和它相关联的句柄等资源 */
        ctx.close();
    

2)客户端:

public class ClientDemo 
    /**
     * 使用 3 个线程模拟三个客户端
     */
    public static void main(String[] args) 
        for (int i = 0; i < 3; i++) 
            new Thread(new ClientDemo.MyThread()).start();
        
    

    static class MyThread implements Runnable 

        public void run() 
            connect("localhost", 8888);
        

        public void connect(String host, int port) 
            /**配置客户端 NIO 线程组/池*/
            EventLoopGroup group = new NioEventLoopGroup();

            try 
                /**
                 * Bootstrap 与 ServerBootstrap 都继承(extends)于 AbstractBootstrap
                 * 创建客户端辅助启动类,并对其配置,与服务器稍微不同,这里的 Channel 设置为 NioSocketChannel
                 * 然后为其添加 Handler,这里直接使用匿名内部类,实现 initChannel 方法
                 * 作用是当创建 NioSocketChannel 成功后,在进行初始化时,将它的ChannelHandler设置到ChannelPipeline中,用于处理网络I/O事件
                 */
                Bootstrap b = new Bootstrap();
                b.group(group).channel(NioSocketChannel.class)
                        .option(ChannelOption.TCP_NODELAY, true)
                        .handler(new ChannelInitializer<SocketChannel>() 
                            @Override
                            public void initChannel(SocketChannel ch) throws Exception 
                                ch.pipeline().addLast(new ClientHandler());
                            
                        );
                /**connect:发起异步连接操作,调用同步方法 sync 等待连接成功*/
                ChannelFuture channelFuture = b.connect(host, port).sync();
                System.out.println(Thread.currentThread().getName() + ",客户端发起异步连接..........");

                /**等待客户端链路关闭*/
                channelFuture.channel().closeFuture().sync();
             catch (InterruptedException e) 
                e.printStackTrace();
             finally 
                /**优雅退出,释放NIO线程组*/
                group.shutdownGracefully();
            
        
    
public class ClientHandler extends ChannelInboundHandlerAdapter 
    private static final Logger logger = Logger.getLogger(ClientHandler.class.getName());

    /**
     * 当客户端和服务端 TCP 链路建立成功之后,Netty 的 NIO 线程会调用 channelActive 方法
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception 
        String reqMsg = "我是客户端 " + Thread.currentThread().getName();
        byte[] reqMsgByte = reqMsg.getBytes("UTF-8");
        ByteBuf reqByteBuf = Unpooled.buffer(reqMsgByte.length);

        /**
         * writeBytes:将指定的源数组的数据传输到缓冲区
         * 调用 ChannelHandlerContext 的 writeAndFlush 方法将消息发送给服务器
         */
        reqByteBuf.writeBytes(reqMsgByte);
        ctx.writeAndFlush(reqByteBuf);
    

    /**
     * 当服务端返回应答消息时,channelRead 方法被调用,从 Netty 的 ByteBuf 中读取并打印应答消息
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception 
        ByteBuf buf = (ByteBuf) msg;
        byte[] req = new byte[buf.readableBytes()];
        buf.readBytes(req);
        String body = new String(req, "UTF-8");
        System.out.println(Thread.currentThread().getName() + ",Server return Message:" + body);
        ctx.close();
    

    /**
     * 当发生异常时,打印异常 日志,释放客户端资源
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception 
        /**释放资源*/
        logger.warning("Unexpected exception from downstream : " + cause.getMessage());
        ctx.close();
    

 

以上是关于Netty的主要内容,如果未能解决你的问题,请参考以下文章

NettyNetty源码编译

nettynetty HashedWheelTimer 延时队列

NettyNetty核心功能和线程模型

NettyNetty 高性能之道

nettyNetty并发工具-Promise

nettyNetty粘包问题TooLongFrameException: Adjusted frame length exceeds