Springboot集成Netty实现TCP通讯

Posted 隔壁老郭的博客

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Springboot集成Netty实现TCP通讯相关的知识,希望对你有一定的参考价值。

Netty

Netty测试客户端

package com.coremain;

import com.coremain.handler.ServerListenerHandler;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.util.CharsetUtil;

import java.util.Scanner;

/**
 * @description: 测试
 * @author: GuoTong
 * @createTime: 2023-05-14 16:46
 * @since JDK 1.8 OR 11
 **/
public class NettyClientTest 

    public static void main(String[] args) throws InterruptedException 
        // 客户端的线程池
        EventLoopGroup workerGroup = new NioEventLoopGroup(8);

        try 
            // 创建Netty客户端端的启动对象
            Bootstrap bootstrap = new Bootstrap();
            // 使用链式编程来配置参数
            bootstrap.group(workerGroup)
                    .channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() 
                @Override
                protected void initChannel(SocketChannel ch) throws Exception 
                    //对workerGroup的SocketChannel设置处理器
                    ChannelPipeline pipeline = ch.pipeline();
                    // 对于通道加入解码器
                    pipeline.addLast("decoder", new StringDecoder());

                    // 对于通道加入加码器
                    pipeline.addLast("encoder", new StringDecoder());

                    // 加入事件回调处理器
                    pipeline.addLast(new ServerListenerHandler());
                
            );
            System.out.println("基于Netty的客户端接入启动完成....");
            ChannelFuture cf = bootstrap.connect("127.0.0.1", 18023).sync();
            // 获取连接通道
            Channel channel = cf.channel();
            System.out.println("+++++++" + channel.localAddress() + "=======");
            // 客户端输入扫描器
            Scanner scanner = new Scanner(System.in);
            while (scanner.hasNext()) 
                String next = scanner.next();
                // 发送到服务端
                channel.writeAndFlush(Unpooled.buffer().writeBytes(next.getBytes(CharsetUtil.UTF_8)));
            

            // 通过sync方法同步等待通道关闭处理完毕,这里会阻塞等待通道关闭完成,内部调用的是Object的wait()方法
            cf.channel().closeFuture().sync();
         finally 
            workerGroup.shutdownGracefully();
        
    


Netty的Server启动类

package com.coremain.netty;

import com.coremain.handler.NettyServerHTTPHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;

/**
 * @description: Netty服务启动器
 * netty服务端会跟随一起启动。 同时,在springboot关闭前,会先销毁netty服务。
 * @author: GuoTong
 * @createTime: 2023-05-14 15:13
 * @since JDK 1.8 OR 11
 **/

@Component
public class NettyServerHTTPRunning 

    // log4j2的AsyncLogger本身的逻辑采用了缓冲区思想,使用的是disruptor框架来实现一个环形无锁队列。
    private static final Logger log = LoggerFactory.getLogger(NettyServerHTTPRunning.class);


    /**
     * 主线程组
     */
    private  NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);

    /**
     * 工作线程组
     */
    private  NioEventLoopGroup workerGroup = new NioEventLoopGroup(10);

    /**
     * (http)主要服务端口
     */
    @Value("$iot.port1:18024")
    private int iot1;

    /**
     * (http)备用服务端口
     */
    @Value("$iot.port2:18025")
    private int iot2;



    /**
     * 启动 netty 服务
     */
    @PostConstruct
    public void startServer() throws InterruptedException 
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                // 三次握手中的A、B队列总和最大值(第二次握手加入A, 第三次握手从A移动到B, accept 后从B取出)
                .option(ChannelOption.SO_BACKLOG, 1024)
                // 解决端口占用问题, 可以共用服务器端口(即使该端口已被其他端口占用)
                .option(ChannelOption.SO_REUSEADDR, true)
                // 接收消息缓冲区大小
                .option(ChannelOption.SO_RCVBUF, 2048)
                // 发送消息缓冲区大小
                .option(ChannelOption.SO_SNDBUF, 2048)
                // 用于启用或关于Nagle算法。如果要求高实时性,有数据发送时就马上发送,就将该选项设置为true关闭Nagle算法;
                // 如果要减少发送次数减少网络交互,就设置为false等累积一定大小后再发送
                .option(ChannelOption.TCP_NODELAY, true)
                // 用于检测长时间没有数据传输的连接状态,当设置该选项以后,如果在两小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文
                .option(ChannelOption.SO_KEEPALIVE, true)
                // 当用户调用close()方法的时候,函数返回,在可能的情况下,尽量发送数据,不一定保证全部发送成功
                // 使用SO_LINGER可以阻塞close()的调用时间,直到数据完全发送
                .option(ChannelOption.SO_LINGER, 2000)
                .childHandler(new NettyServerHTTPHandler());
        ChannelFuture service1 = serverBootstrap.bind(iot1).sync();
        ChannelFuture service2 = serverBootstrap.bind(iot2).sync();
        if (service1.isSuccess()) 
            log.info("服务1启动成功, port: ", iot1);
        
        if (service2.isSuccess()) 
            log.info("服务2启动成功, port: ", iot2);
        

    

    /**
     * 销毁
     */
    @PreDestroy
    public void destroy() 
        bossGroup.shutdownGracefully().syncUninterruptibly();
        workerGroup.shutdownGracefully().syncUninterruptibly();
        log.info("关闭 Netty 成功");
    

Netty的服务端核心类:ServerBootstrap

package com.coremain.config;

import com.coremain.handler.NettyServerHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @description: 配置NettyServer
 * @author: GuoTong
 * @createTime: 2023-05-14 14:54
 * @since JDK 1.8 OR 11
 **/
@Configuration
@EnableConfigurationProperties
public class NettyConfig 

    private final NettyProperties nettyProperties;

    public NettyConfig(NettyProperties nettyProperties) 
        this.nettyProperties = nettyProperties;
    

    /**
     * boss线程池-进行客户端连接
     *
     * @return
     */
    @Bean
    public NioEventLoopGroup boosGroup() 
        return new NioEventLoopGroup(nettyProperties.getBoss());
    

    /**
     * worker线程池-进行业务处理
     *
     * @return
     */
    @Bean
    public NioEventLoopGroup workerGroup() 
        return new NioEventLoopGroup(nettyProperties.getWorker());
    

    /**
     * 服务端启动器,监听客户端连接
     *
     * @return
     */
    @Bean
    public ServerBootstrap serverBootstrap() 
        ServerBootstrap serverBootstrap = new ServerBootstrap()
                // 指定使用的线程组
                .group(boosGroup(), workerGroup())
                // 指定使用的通道
                .channel(NioServerSocketChannel.class)
                // 三次握手中的A、B队列总和最大值(第二次握手加入A, 第三次握手从A移动到B, accept 后从B取出)
                .option(ChannelOption.SO_BACKLOG, 1024)
                // 指定连接超时时间
                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyProperties.getTimeout())
                // 支持长连接
                .option(ChannelOption.SO_KEEPALIVE, true)
                // 接收消息缓冲区大小
                .option(ChannelOption.SO_RCVBUF, 2048)
                // 发送消息缓冲区大小
                .option(ChannelOption.SO_SNDBUF, 2048)
                // 指定worker处理器
                .childHandler(new NettyServerHandler());
        return serverBootstrap;
    


Netty的通道处理器 ChannelInitializer

package com.coremain.handler;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import org.springframework.stereotype.Component;

/**
 * @description: Netty服务端回调处理
 * @ChannelHandler.Sharable。这个注解的作用是标注一个Handler实例可以被多个通道安全地共享。
 * @author: GuoTong
 * @createTime: 2023-05-14 14:57
 * @since JDK 1.8 OR 11
 **/
@ChannelHandler.Sharable
@Component
public class NettyServerHandler extends ChannelInitializer<SocketChannel> 

    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception 
        // 数据分割符
        String delimiterStr = "##@##";
        ByteBuf delimiter = Unpooled.copiedBuffer(delimiterStr.getBytes());
        ChannelPipeline pipeline = socketChannel.pipeline();
        // 使用自定义处理拆包/沾包,并且每次查找的最大长度为1024字节
        pipeline.addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
        // 将上一步解码后的数据转码为Message实例
        pipeline.addLast(new MessageUTF8DecodeHandler());
        // 对发送客户端的数据进行编码,并添加数据分隔符
        pipeline.addLast(new MessageUTF8EncodeHandler(delimiterStr));
        // 对数据进行最终处理
        pipeline.addLast(new ServerListenerHandler());
    


Netty回调处理器SimpleChannelInboundHandler

package com.coremain.handler;

import com.coremain.handler.bean.MessageEnum;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.SocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import java.time.LocalDateTime;

/**
 * @description: 数据处理器,针对不同类型数据分类处理 在处理不同接收数据时使用了枚举类型
 * @ChannelHandler.Sharable。这个注解的作用是标注一个Handler实例可以被多个通道安全地共享。
 * @author: GuoTong
 * @createTime: 2023-05-14 15:07
 * @since JDK 1.8 OR 11
 **/
@ChannelHandler.Sharable
@Component
public class ServerListenerHandler extends SimpleChannelInboundHandler<MessageStrUTF8> 

    private static final Logger log = LoggerFactory.getLogger(ServerListenerHandler.class);

    /**
     * 当客户端主动链接服务端的链接后,这个通道就是活跃的了。也就是客户端与服务端建立了通信通道并且可以传输数据
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception 
        SocketChannel channel = (SocketChannel) ctx.channel();
        String currentData = LocalDateTime.now().format(CommonUtilHandler.dateTimeFormatter);
        //通知客户端链接建立成功
        String str = "通知客户端链接建立成功" + " " + currentData + " " + channel.localAddress().getHostString() + "\\r\\n";
        ctx.writeAndFlush(str);
    

    /**
     * 设备接入连接时处理
     *
     * @param ctx
     */
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) 
        log.info("有新的连接:[]", ctx.channel().id().asLongText());
    

    /**
     * 数据处理
     *
     * @param ctx
     * @param msg
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, MessageStrUTF8 msg) 
        // 获取消息实例中的消息体
        String content = msg.getContent();
        // 对不同消息类型进行处理
        MessageEnum type = MessageEnum.getStructureEnum(msg);
        String currentData = LocalDateTime.now().format(CommonUtilHandler.dateTimeFormatter);
        switch (type) 
            case CONNECT:
                // TODO 心跳消息处理
            case STATE:
                // TODO 设备状态
            default:
                log.info(currentData + type.content + " 消息内容" + content);
        
    


    /**
     * 设备下线处理
     *
     * @param ctx
     */
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) 
        log.info("设备下线了:", ctx.channel().id().asLongText());
    

    /**
     * 当客户端主动断开服务端的链接后,这个通道就是不活跃的。也就是说客户端与服务端的关闭了通信通道并且不可以传输数据
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception 
        log.info("客户端断开链接", ctx.channel().localAddress().toString());
    


    /**
     * 设备连接异常处理
     *
     * @param ctx
     * @param cause
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) 
        cause.printStackTrace();
        // 打印异常
        log.info("异常:", cause.getMessage());
        // 关闭连接
        ctx.close();
    



maven依赖


 <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-starter-logging</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <!--整合web模块-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-starter-tomcat</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-undertow</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.16</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency>
        <!--导入log4j2日志依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-log4j2</artifactId>
        </dependency>
        <!--og4j2的AsyncLogger本身的逻辑采用了缓冲区思想,使用的是disruptor框架来实现一个环形无锁队列。-->
        <dependency>
            <groupId>com.lmax</groupId>
            <artifactId>disruptor</artifactId>
            <version>$disruptor-version</version>
        </dependency>
        <!--Netty-->
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.68.Final</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba.fastjson2</groupId>
            <artifactId>fastjson2</artifactId>
            <version>$fastJson-version</version>
        </dependency>
    </dependencies>

项目结构

日志配置文件 log4j2.xml

<?xml version="1.0" encoding="UTF-8"?>
<!--status:Log4j2内部日志的输出级别,设置为TRACE对学习Log4j2非常有用 -->
<!--monitorInterval:定时检测配置文件的修改,有变化则自动重新加载配置,时间单位为秒,最小间隔为5s -->
<Configuration status="debug" name="MyApp" packages="" monitorInterval="600">

    <!--properties:设置全局变量 -->
    <properties>
        <!--LOG_HOME:指定当前日志存放的目录 -->
        <property name="LOG_HOME">./NettyStudy/log4j2</property>
        <!--FILE_NAME:指定日志文件的名称 -->
        <property name="FILE_NAME">application</property>

        <property name="log_pattern">%dyyyy-MM-dd HH:mm:ss.SSS [%t] %-5level [%L] - %msg%n</property>
        <property name="max_single_file_size">20MB</property>
    </properties>


    <!--Appenders:定义日志输出目的地,内容和格式等 -->
    <Appenders>

        <!--Console:日志输出到控制台标准输出 -->
        <Console name="Console" target="SYSTEM_OUT">
            <!--pattern:日期,线程名,日志级别,日志名称,日志信息,换行 -->
            <PatternLayout pattern="%dHH:mm:ss.SSS [%t] %-5level %logger36 - %msg%n"/>
        </Console>

        <!--RollingFile:日志输出到文件,下面的文件都使用相对路径 -->
        <!--fileName:当前日志输出的文件名称 -->
        <!--filePattern:备份日志文件名称,备份目录为logs下面以年月命名的目录,备份时使用gz格式压缩 -->
        <RollingFile name="RollingFile" fileName="$LOG_HOME/$FILE_NAME.log"
                     filePattern="$LOG_HOME/$$date:yyyy-MM/$FILE_NAME-%dyyyy-MM-dd-%i.log.gz">
            <!--pattern:日期,线程名,日志级别,日志名称,日志信息,换行 -->
            <PatternLayout pattern="%dyyyy-MM-dd HH:mm:ss.SSS [%t] %-5level [%L] - %msg%n"/>
            <Policies>
                <!--SizeBasedTriggeringPolicy:日志文件按照大小备份 -->
                <!--size:指定日志文件最大为100MB,单位可以为KB、MB或GB -->
                <SizeBasedTriggeringPolicy size="20MB"/>
            </Policies>

        </RollingFile>

        <RollingFile name="InfoLogRollingFile" fileName="$LOG_HOME/my_app_info.log"
                     filePattern="$LOG_HOME/$$date:yyyy_MM_dd/my_app_info_%dyyyy_MM_dd_HH_%i.log.gz">
            <ThresholdFilter level="INFO" onMatch="ACCEPT" onMismatch="DENY"/>
            <PatternLayout pattern="$log_pattern"/>
            <Policies>
                <TimeBasedTriggeringPolicy/>
                <SizeBasedTriggeringPolicy size="$max_single_file_size"/>
            </Policies>
            <DefaultRolloverStrategy fileIndex="nomax">
                <Delete basePath="$LOG_HOME" maxDepth="2">
                    <IfFileName glob="*/my_app_info_*.log.gz">
                        <!-- 这里表示匹配“*/my_app_info_*.log.gz”模式的日志文件的删除策略如下:
                        - 只要日志文件总数量超过5个就删除按时间顺序最早的日志文件
                        - 只要日志文件总大小超过10MB就会删除按时间顺序最早的日志文件
                        - 只要日志文件最近修改时间为9分钟前或更早就会删除按时间顺序最早的日志文件 -->
                        <IfAny>
                            <IfAccumulatedFileSize exceeds="8MB"/>
                            <IfAccumulatedFileCount exceeds="5"/>
                            <IfLastModified age="9m"/>
                        </IfAny>
                    </IfFileName>
                </Delete>
            </DefaultRolloverStrategy>
        </RollingFile>

        <RollingFile name="WarnLogRollingFile" fileName="$LOG_HOME/my_app_warn.log"
                     filePattern="$LOG_HOME/$$date:yyyy_MM_dd/my_app_warn_%dyyyy_MM_dd_HH_%i.log.gz">
            <ThresholdFilter level="WARN" onMatch="ACCEPT" onMismatch="DENY"/>
            <PatternLayout pattern="$log_pattern"/>
            <Policies>
                <TimeBasedTriggeringPolicy/>
                <SizeBasedTriggeringPolicy size="$max_single_file_size"/>
            </Policies>
            <DefaultRolloverStrategy fileIndex="nomax">
                <Delete basePath="$LOG_HOME" maxDepth="2">
                    <IfFileName glob="*/my_app_warn_*.log.gz">
                        <IfAny>
                            <IfAccumulatedFileSize exceeds="3GB"/>
                            <IfAccumulatedFileCount exceeds="3000"/>
                            <IfLastModified age="30d"/>
                        </IfAny>
                    </IfFileName>
                </Delete>
            </DefaultRolloverStrategy>
        </RollingFile>

        <RollingFile name="ErrorLogRollingFile" fileName="$LOG_HOME/my_app_error.log"
                     filePattern="$LOG_HOME/$$date:yyyy_MM_dd/my_app_error_%dyyyy_MM_dd_HH_%i.log.gz">
            <ThresholdFilter level="ERROR" onMatch="ACCEPT" onMismatch="DENY"/>
            <PatternLayout pattern="$log_pattern"/>
            <Policies>
                <TimeBasedTriggeringPolicy/>
                <SizeBasedTriggeringPolicy size="$max_single_file_size"/>
            </Policies>
            <DefaultRolloverStrategy fileIndex="nomax">
                <Delete basePath="$LOG_HOME" maxDepth="2">
                    <IfFileName glob="*/my_app_error_*.log.gz">
                        <IfAny>
                            <IfAccumulatedFileSize exceeds="3GB"/>
                            <IfAccumulatedFileCount exceeds="3000"/>
                            <IfLastModified age="30d"/>
                        </IfAny>
                    </IfFileName>
                </Delete>
            </DefaultRolloverStrategy>
        </RollingFile>

    </Appenders>
    <Loggers>
        <AsyncLogger name="com.meituan.Main" level="trace" additivity="false">
            <appender-ref ref="RollingFile"/>
        </AsyncLogger>
        <AsyncLogger name="RollingFile2" level="trace" additivity="false">
            <appender-ref ref="RollingFile2"/>
        </AsyncLogger>
        <Root level="info">
            <AppenderRef ref="Console"/>
            <AppenderRef ref="InfoLogRollingFile"/>
        </Root>
    </Loggers>
</Configuration>

配置文件 application.yml

# netty 配置
netty:
  # boss线程数量
  boss: 4
  # worker线程数量
  worker: 2
  # 连接超时时间
  timeout: 6000
  # 服务器主端口
  port: 18023
  # 服务器备用端口
  portSalve: 18026
  # 服务器地址
  host: 127.0.0.1
spring:
  application:
    name: netty-server
  mvc:
    servlet:
      load-on-startup: 1 #项目启动时执行初始化即可解决。

server:
  port: 15026
  undertow:
    accesslog:
      enabled: false
    direct-buffers: true # 是否分配的直接内存(NIO直接分配的堆外内存)
    buffer-size: 1024  #每块buffer的空间大小,越小的空间被利用越充分
    threads:
      worker: 20 # 阻塞任务线程池, 它的值设置取决于系统线程执行任务的阻塞系数,默认值是IO线程数*8
      io: 4  # CPU有几核,就填写几。
  servlet:
    context-path: /undertow

启动Netty服务端

启动Netty客户端

客户端发送消息

服务端收到

使用netty+springboot打造的tcp长连接通讯方案

文章目录


项目背景

最近公司某物联网项目需要使用socket长连接进行消息通讯,捣鼓了一版代码上线,结果BUG不断,本猿寝食难安,于是求助度娘,数日未眠项目终于平稳运行了,本着开源共享的精神,本猿把项目代码提炼成了一个demo项目,尽量摒弃了其中丑陋的业务部分,希望与同学们共同学习进步。


正文

一、项目架构

本项目使用了netty、redis以及springboot2.2.0

二、项目模块

本项目目录结构如下图:

netty-tcp-core是公共模块,主要是工具类。netty-tcp-server是netty服务端,服务端仅作测试使用,实际项目中我们只使用了客户端。netty-tcp-client是客户端,也是本文的重点。

三、业务流程

我们实际项目中使用RocketMQ作为消息队列,本项目由于是demo项目于是改为了BlockingQueue。数据流为:

生产者->消息队列->消费者(客户端)->tcp通道->服务端->tcp通道->客户端

当消费者接收到某设备发送的消息后,将判断缓存中是否存在该设备与服务端的连接,如果存在并且通道活跃则使用该通道发送消息,如果不存在则创建通道并在通道激活后立即发送消息,当客户端收到来自服务端的消息时进行响应的业务处理。

四、代码详解

1.消息队列

由于本demo项目移除了消息中间件,于是需要自己创建一个本地队列模拟真实使用场景

package org.example.client;

import org.example.client.model.NettyMsgModel;

import java.util.concurrent.ArrayBlockingQueue;

/**
 * 本项目为演示使用本地队列 实际生产中应该使用消息中间件代替(rocketmq或rabbitmq)
 *
 * @author ReWind00
 * @date 2023/2/15 11:20
 */
public class QueueHolder 

    private static final ArrayBlockingQueue<NettyMsgModel> queue = new ArrayBlockingQueue<>(100);

    public static ArrayBlockingQueue<NettyMsgModel> get() 
        return queue;
    

使用一个类保存队列的静态实例以便在任何类中都可以快速引用。接下来我们需要启动一个线程去监听队列中的消息,一但消息投递到队列中,我们就取出消息然后异步多线程处理该消息。

    public class LoopThread implements Runnable 
        @Override
        public void run() 
            for (int i = 0; i < MAIN_THREAD_POOL_SIZE; i++) 
                executor.execute(() -> 
                    while (true) 
                        //取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到
                        try 
                            NettyMsgModel nettyMsgModel = QueueHolder.get().take();
                            messageProcessor.process(nettyMsgModel);
                         catch (InterruptedException e) 
                            log.error(e.getMessage(), e);
                        
                    
                );
            
        
    

使用take方法会使该线程一直阻塞直到队列收到消息后进入下一次循环。

2.执行类

process方法来自于MessageProcessor类,该类为单例,但是会有多线程同时执行。

    public void process(NettyMsgModel nettyMsgModel) 
        String imei = nettyMsgModel.getImei();
        try 
            synchronized (this)  //为避免收到同一台设备多条消息后重复创建客户端,必须加锁
                if (redisCache.hasKey(NETTY_QUEUE_LOCK + imei))  //上一条消息处理中
                    log.info("imei=消息处理中,重新入列", imei);
                    //放回队列重新等待消费 延迟x秒(实际项目中应该使用rocketmq或者rabbitmq实现延迟消费)
                    new Timer().schedule(new TimerTask() 
                        @Override
                        public void run() 
                            QueueHolder.get().offer(nettyMsgModel);
                        
                    , 2000);
                    log.info("imei=消息处理中,重新入列完成", imei);
                    return;
                 else 
                    //如果没有在连接中的直接加锁
                    redisCache.setCacheObject(NETTY_QUEUE_LOCK + imei, "1", 120, TimeUnit.SECONDS);
                
            
            //缓存中存在则发送消息
            if (NettyClientHolder.get().containsKey(imei)) 
                NettyClient nettyClient = NettyClientHolder.get().get(imei);
                if (null != nettyClient.getChannelFuture() && nettyClient.getChannelFuture().channel().isActive())  //通道活跃直接发送消息
                    if (!nettyClient.getChannelFuture().channel().isWritable()) 
                        log.warn("警告,通道不可写,imei=,channelId=", nettyClient.getImei(),
                                nettyClient.getChannelFuture().channel().id());
                    
                    nettyClient.send(nettyMsgModel.getMsg());
                 else 
                    log.info("client imei=,通道不活跃,主动关闭", nettyClient.getImei());
                    nettyClient.close();
                    //重新创建客户端发送
                    this.createClientAndSend(nettyMsgModel);
                
             else   //缓存中不存在则创建新的客户端
                this.createClientAndSend(nettyMsgModel);
            
         catch (Exception e) 
            log.error(e.getMessage(), e);
         finally 
            //执行完后解锁
            redisCache.deleteObject(NETTY_QUEUE_LOCK + imei);
        

    

其中imei是我们设备的唯一标识,我们可以用imei作为缓存的key来确认是否已创建过连接。由于我们消息的并发量可能会很大,所以存在当某设备的连接正在创建的过程中,另一个线程收到该设备消息也开始创建连接的情况,所以我们使用synchronized 代码块以及redis分布式锁来避免此情况的发生。当一条消息获得锁后,在锁释放前,后续消息将会被重新放回消息队列并延迟消费。

获取锁的线程会根据imei判断缓存是否存在连接,如果存在直接发送消息,如果不存在则进入创建客户端的方法。

    private void createClientAndSend(NettyMsgModel nettyMsgModel) 
        log.info("创建客户端执行中imei=", nettyMsgModel.getImei());
        //此处的DemoClientHandler可以根据自己的业务定义
        NettyClient nettyClient = SpringUtils.getBean(NettyClient.class, nettyMsgModel.getImei(), nettyMsgModel.getBizData(),
                this.createDefaultWorkGroup(this.workerThread), DemoClientHandler.class);
        executor.execute(nettyClient); //执行客户端初始化
        try 
            //利用锁等待客户端激活
            synchronized (nettyClient) 
                long c1 = System.currentTimeMillis();
                nettyClient.wait(5000); //最多阻塞5秒 5秒后客户端仍然未激活则自动解锁
                long c2 = System.currentTimeMillis();
                log.info("创建客户端wait耗时=ms", c2 - c1);
            
            if (null != nettyClient.getChannelFuture() && nettyClient.getChannelFuture().channel().isActive())  //连接成功
                //存入缓存
                NettyClientHolder.get().put(nettyMsgModel.getImei(), nettyClient);
                //客户端激活后发送消息
                nettyClient.send(nettyMsgModel.getMsg());
             else  //连接失败
                log.warn("客户端创建失败,imei=", nettyMsgModel.getImei());
                nettyClient.close();
                //可以把消息重新入列处理
            
         catch (Exception e) 
            log.error("客户端初始化发送消息异常===>", e.getMessage(), e);
        
    

当netty客户端实例创建后使用线程池执行初始化,由于是异步执行,我们此时立刻发送消息很可能客户端还没有完成连接,因此必须加锁等待。进入synchronized 代码块,使用wait方法等待客户端激活后解锁,参数5000为自动解锁的毫秒数,意思是如果客户端出现异常情况迟迟未能连接成功并激活通道、解锁,则最多5000毫秒后该锁自动解开。这参数在实际使用时可以视情况调整,在并发量很大的情况下,5秒的阻塞可能会导致线程池耗尽,或内存溢出。待客户端创建成功并激活后则立即发送消息。

3.客户端

package org.example.client;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.CharsetUtil;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.example.client.handler.BaseClientHandler;
import org.example.core.util.SpringUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @author ReWind00
 * @date 2023/2/15 9:59
 */
@Slf4j
@Component
@Scope("prototype")
@Getter
@NoArgsConstructor
public class NettyClient implements Runnable 

    @Value("$netty.server.port")
    private int port;

    @Value("$netty.server.host")
    private String host;
    //客户端唯一标识
    private String imei;
    //自定义业务数据
    private Map<String, Object> bizData;

    private EventLoopGroup workGroup;

    private Class<BaseClientHandler> clientHandlerClass;

    private ChannelFuture channelFuture;

    public NettyClient(String imei, Map<String, Object> bizData, EventLoopGroup workGroup, Class<BaseClientHandler> clientHandlerClass) 
        this.imei = imei;
        this.bizData = bizData;
        this.workGroup = workGroup;
        this.clientHandlerClass = clientHandlerClass;
    

    @Override
    public void run() 
        try 
            this.init();
            log.info("客户端启动imei=", imei);
         catch (Exception e) 
            log.error("客户端启动失败:", e.getMessage(), e);
        
    

    public void close() 
        if (null != this.channelFuture) 
            this.channelFuture.channel().close();
        
        NettyClientHolder.get().remove(this.imei);
    

    public void send(String message) 
        try 
            if (!this.channelFuture.channel().isActive()) 
                log.info("通道不活跃imei=", this.imei);
                return;
            
            if (!StringUtils.isEmpty(message)) 
                log.info("队列消息发送===>", message);
                this.channelFuture.channel().writeAndFlush(message);
            
         catch (Exception e) 
            log.error(e.getMessage(), e);
        
    

    private void init() throws Exception 
        //将本实例传递到handler
        BaseClientHandler clientHandler = SpringUtils.getBean(clientHandlerClass, this);
        Bootstrap b = new Bootstrap();
        //2 通过辅助类去构造server/client
        b.group(workGroup)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
                .option(ChannelOption.SO_RCVBUF, 1024 * 32)
                .option(ChannelOption.SO_SNDBUF, 1024 * 32)
                .handler(new ChannelInitializer<SocketChannel>() 
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception 
                        ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024 * 1024, Unpooled.copiedBuffer("\\r\\n".getBytes())));
                        ch.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8));// String解码。
                        ch.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8));// String解码。
//                        // 心跳设置
                        ch.pipeline().addLast(new IdleStateHandler(0, 0, 600, TimeUnit.SECONDS));
                        ch.pipeline().addLast(clientHandler);
                    
                );
        this.connect(b);
    

    private void connect(Bootstrap b) throws InterruptedException 
        long c1 = System.currentTimeMillis();
        final int maxRetries = 2; //重连2次
        final AtomicInteger count = new AtomicInteger();
        final AtomicBoolean flag = new AtomicBoolean(false);
        try 
            this.channelFuture = b.connect(host, port).addListener(
                    以上是关于Springboot集成Netty实现TCP通讯的主要内容,如果未能解决你的问题,请参考以下文章

springboot集成netty框架(物联网tcp连接,只服务端)

SpringBoot集成netty-socket.io

网络I/o编程模型12 netty实现tcp服务通讯(含EventLoop实现调度)

SpringBoot 集成Netty实现UDP Server

IM即时通讯开发用Netty实现心跳机制断线重连机制

SpringBoot 集成 WebSocket,实现后台向前端推送信息