springboot netty非阻塞协议搭建

Posted guxiaohai_

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了springboot netty非阻塞协议搭建相关的知识,希望对你有一定的参考价值。

一:简介

  1. Netty是由JBOSS提供的一个java开源框架。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。也就是说,Netty 是一个基于NIO的客户、服务器端编程框架,使用Netty 可以确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户、服务端应用。Netty相当于简化和流线化了网络应用的编程开发过程,例如:基于TCP和UDP的socket服务开发。“快速”和“简单”并不用产生维护性或性能上的问题。Netty 是一个吸收了多种协议(包括FTP、SMTP、HTTP等各种二进制文本协议)的实现经验,并经过相当精心设计的项目。最终,Netty 成功的找到了一种方式,在保证易于开发的同时还保证了其应用的性能,稳定性和伸缩性

二:编码

  1. 引入程序包
 <dependency>
     <groupId>io.netty</groupId>
     <artifactId>netty-all</artifactId>
     <version>4.1.36.Final</version>
 </dependency>
  1. 搭建netty服务
@Slf4j
@Component
public class NettyServer implements ApplicationRunner {

    /**
     * @Author: guwenhai
     * @Description:    开启服务线程
     * @Date: 2021-03-27 15:40
     */
    @Override
    public void run(ApplicationArguments args) throws Exception {
        String tcpIp = ""//tcpIp
        String tcpPort = "" //tcp端口
        start(new InetSocketAddress(tcpIp, Integer.valueOf(tcpPort)));
    }

    public void start(InetSocketAddress socketAddress) {
        //new 一个主线程组
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        //new 一个工作线程组
        EventLoopGroup workGroup = new NioEventLoopGroup(200);
        ServerBootstrap bootstrap = new ServerBootstrap()
                .group(bossGroup, workGroup)
                .channel(NioserverSocketChannel.class)
                .childHandler(new ServerChannelInitializer())
                .localAddress(socketAddress)
                //设置队列大小
                .option(ChannelOption.SO_BACKLOG, 1024)
                // 两小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文
                .childOption(ChannelOption.SO_KEEPALIVE, true);
        //绑定端口,开始接收进来的连接
        try {
            ChannelFuture future = bootstrap.bind(socketAddress).sync();
            log.info("Netty服务器启动开始监听端口: {}", socketAddress.getPort());
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            log.error("Netty端口冲突,异常信息:{}", e.getMessage());
            e.printStackTrace();
        } finally {
            //关闭主线程组
            bossGroup.shutdownGracefully();
            //关闭工作线程组
            workGroup.shutdownGracefully();
        }
    }
}
  1. netty服务初始化器
public class ServerChannelInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        ChannelPipeline pipeline  =  socketChannel.pipeline();
        pipeline.addLast("handler", new NettyServerHandler());
    }
}
  1. netty服务端处理器
@Slf4j
@Component
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
    //定义本类的静态对象
    private static NettyServerHandler nettyServerHandler;
    
    @PostConstruct
    public void init(){
        nettyServerHandler = this;
    }

    /**
     * 客户端连接会触发
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        log.info("TCP对象线程: {}", ctx);
    }

    /**
     * 客户端发消息会触发
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //客户端上传消息
        String message = onMessage(msg);
    }

    /**
     * 发生异常触发
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        close(ctx);
    }

    /**
     * @Author: guwenhai
     * @Description:    获取指定客户端的上传信息
     * @Date: 2020/11/2 11:56
     */
    public static String onMessage(Object msg) {
        ByteBuf buf = (ByteBuf)msg;
        byte [] bytes = new byte[buf.readableBytes()];
        buf.readBytes(bytes);//复制内容到字节数组bytes
        String message = HexEcodeUtil.ByteArrayToHexStr(bytes);//将接收到的数据转为字符串,此字符串就是客户端发送的字符串
        return message;
    }

    /**
     * @Author: guwenhai
     * @Description:    公用回写数据到客户端的方法
     * @param receiveStr    写入数据
     * @param ctx 当前客户端
     * @param mark 类型
     * @Date: 2021-04-25 15:29
     */
    public static void writeToClient(final String receiveStr, ChannelHandlerContext ctx,String mark) {
        try {
            ByteBuf bufff = Unpooled.buffer();//netty需要用ByteBuf传输
            bufff.writeBytes(HexEcodeUtil.hexStrToBinaryStr(receiveStr));//对接需要16进制
            ctx.writeAndFlush(bufff).addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    StringBuilder sb = new StringBuilder("");
                    if(!StringUtils.isEmpty(mark)){
                        sb.append("【").append(mark).append("】");
                    }
                    if (future.isSuccess()) {
                        log.info("回写成功:" + sb.toString() + "{}",receiveStr);
                    } else {
                        log.info("回写失败:" + sb.toString() + "{}",receiveStr);
                    }
                }
            });
        } catch (Exception e) {
            e.printStackTrace();
            log.error("调用通用writeToClient()异常:",e);
            close(ctx);
        }
    }

    /**
     * @Author: guwenhai
     * @Description:    指定netty资源回收
     * @Date: 2020/11/2 11:57
     */
    public static void close(ChannelHandlerContext ctx) {
        if (ctx != null) {
            try {
                log.info("关闭netty:{}", ctx);
                ctx.close();
            } catch (Exception e) {
                log.error("关闭netty异常{}", e);
            }
        }
    }
}

以上是关于springboot netty非阻塞协议搭建的主要内容,如果未能解决你的问题,请参考以下文章

Netty——网络编程(非阻塞理解及代码示例)

Netty——网络编程(非阻塞理解及代码示例)

Netty(RPC高性能之道)原理剖析

springboot socket tcp阻塞协议搭建

netty为什么非阻塞

13.RPC的socket实现(阻塞式)与netty实现(非阻塞式)