springboot集成netty框架(物联网tcp连接,只服务端)

Posted 盖被子的冰块

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了springboot集成netty框架(物联网tcp连接,只服务端)相关的知识,希望对你有一定的参考价值。

Maven

<dependency>
	<groupId>io.netty</groupId>
	<artifactId>netty-all</artifactId>
	<version>5.0.0.Alpha2</version>
</dependency>
NettyServerBootStrap.java(带解决粘包问题,客户端上报消息体末尾的加上\\r\\n,可自定义)
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioserverSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class NettyServerBootStrap {

    @Autowired
    private NettyServerHandler nettyServerHandler;

    public void start() throws InterruptedException {
        EventLoopGroup boss = new NioEventLoopGroup();
        EventLoopGroup worker = new NioEventLoopGroup();
        ServerBootstrap bootstrap = new ServerBootstrap();
        try {
            bootstrap.group(boss, worker)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 128)
                    // 使消息立即发出去,不用等待到一定的数据量才发出去
                    .option(ChannelOption.TCP_NODELAY, true)
                    // 保持长连接状态
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline p = socketChannel.pipeline();
                            ByteBuf delimiter = Unpooled.copiedBuffer("\\\\r\\\\n".getBytes());
                            p.addLast(new DelimiterBasedFrameDecoder(2024,delimiter));
                            p.addLast(new StringDecoder(CharsetUtil.UTF_8));
                            p.addLast(new StringEncoder(CharsetUtil.UTF_8));
                            p.addLast(nettyServerHandler);
                        }
                    });
            // 绑定端口,同步等待成功
            ChannelFuture f = bootstrap.bind(7988).sync();
            if (f.isSuccess()) {
                log.info("Netty Start successful");
            } else {
                log.error("Netty Start failed");
            }
            // 等待服务监听端口关闭
            f.channel().closeFuture().sync();
        } finally {
            // 退出,释放线程资源
            worker.shutdownGracefully();
            boss.shutdownGracefully();
        }
    }
}
NettyServerHandler.java(处理业务)
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.xcl.entity.Nettyclient;
import com.xcl.service.SocketssService;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.SocketChannel;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.net.InetSocketAddress;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;

@Component
@ChannelHandler.Sharable
@Slf4j
public class NettyServerHandler extends SimpleChannelInboundHandler<String> {

    @Autowired
    com.xcl.service.SocketssService SocketssService;
    String requestJson;

    private static NettyServerHandler NettyServerHandler;
    /**
     * 管理一个全局map,保存连接进服务端的通道数量
     */
    //连接map
    public  static Map<String, ChannelHandlerContext> map = new HashMap<String, ChannelHandlerContext>();

    public static final ConcurrentHashMap<String, Nettyclient> CHANNEL_MAP3 = new ConcurrentHashMap<>();

    @PostConstruct
    public void init() {
        NettyServerHandler = this;
    }


    /**
     * @Description 客户端连接时执行,将客户端信息保存到Map中
     * @param ctx
     * @Date 2019/8/28 14:22
     * @Author xuchenliang
     * @return
     **/
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();

        String clientIp = insocket.getAddress().getHostAddress();
        int clientPort = insocket.getPort();

        // 获取连接通道唯一标识
        //ChannelId channelId = ctx.channel().id();
        String channelId2 = ctx.channel().id().toString();

        // 如果map中不包含此连接,就保存连接
        if (CHANNEL_MAP3.containsKey(channelId2)) {
            //log.info("客户端【" + channelId + "】是连接状态,连接通道数量: " + CHANNEL_MAP.size());
        } else {
            // 保存连接
            //CHANNEL_MAP.put(channelId, ctx);
            SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

            Nettyclient Nettyclient=new Nettyclient();
            Nettyclient.setCtx(ctx);
            Nettyclient.setCreatetime(df.format(new Date()));
            Nettyclient.setPort(clientPort+"");

            CHANNEL_MAP3.put(channelId2,Nettyclient);
            channelWriteClient(channelId2,"hello,"+channelId2,"1");

            System.out.println("客户端【" + channelId2 + "】连接netty服务器[IP:" + clientIp + "--->PORT:" + clientPort + "]");
            System.out.println("连接通道数量: " + CHANNEL_MAP3.size());

			/*log.info("客户端【" + channelId + "】连接netty服务器[IP:" + clientIp + "--->PORT:" + clientPort + "]");
			log.info("连接通道数量: " + CHANNEL_MAP.size());*/
        }
    }


    /**
     * @Description 客户端断开连接时执行,将客户端信息从Map中移除
     * @param ctx
     * @Date 2019/8/28 14:22
     * @Author xuchenliang
     * @return
     **/
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();

        String clientIp = insocket.getAddress().getHostAddress();

        //ChannelId channelId = ctx.channel().id();
        String channelId2 = ctx.channel().id().toString();

        // 包含此客户端才去删除
        if (CHANNEL_MAP3.containsKey(channelId2)) {
            // 删除连接
            CHANNEL_MAP3.remove(channelId2);

			/*log.info("客户端【" + channelId + "】退出netty服务器[IP:" + clientIp + "--->PORT:" + insocket.getPort() + "]");
			log.info("连接通道数量: " + CHANNEL_MAP.size());*/
        }
        Collection<ChannelHandlerContext> col = NettyServerHandler.map.values();
        while(true == col.contains(channelId2)) {
            col.remove(channelId2);
        }
    }

    /**
     * @Description 收到消息时执行,根据消息类型做不同的处理
     * @param ctx
     * @param msg
     * @Date 2019/8/28 14:33
     * @Author xuchenliang
     * @return
     **/
    JSONObject jsonObject;
    List<JSONObject> jsonlist;
    @Override
    public void messageReceived(ChannelHandlerContext ctx, String msg) throws Exception {
        try {
            jsonObject = JSON.parseObject(msg.toString());
            jsonlist = (List<JSONObject>) jsonObject.get("detail");
            if(jsonlist!=null){
                for (int i = 0; i < jsonlist.size(); i++) {
                    NettyServerHandler.map.put(jsonlist.get(i).getString("iemi"), ctx);
                }
            }
            // json数组
            requestJson = NettyServerHandler.SocketssService.connectSocketss(msg.toString(),ctx.channel().id().toString());

            ctx.write(requestJson);
            ctx.flush();
        } catch (Exception e) {
            e.printStackTrace();
            ctx.write("{\\"code\\":\\"-1\\",\\"msg\\":\\"param_error\\"}");
            ctx.flush();
        }
    }

    // 需要发送的消息内容
    public void channelWrite(String item, Object msg) throws Exception {
        //ChannelHandlerContext ctx = CHANNEL_MAP.get(channelId);

        ChannelHandlerContext ctx= NettyServerHandler.map.get(item);
        if (ctx == null) {
            //log.info("通道【" + ctx.channel().id() + "】不存在");
            return;
        }

        if (msg == null || msg == "") {
            //log.info("服务端响应空的消息");
            return;
        }
        // 将客户端的信息直接返回写入ctx
        ctx.write(msg);
        // 刷新缓存区
        ctx.flush();
    }


    /**
     * @description: TODO
     * @param ctx
     * @param cause
     * @Author: xuchenliang
     * @Date: 2019/08/30 13:41:51
     * @return: void
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }

    /**
     * 主动向指定的客户端发消息
     */

    public static  void  channelWriteClient(String channelId, String msg,String status) throws Exception {

        ChannelHandlerContext ctx = null;

        for(String key : CHANNEL_MAP3.keySet()){
            if(channelId.equals(key)){
                ctx=CHANNEL_MAP3.get(key).getCtx();
            }
        }
        try {

            SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            if("1".equals(status)){

                ctx.write("{\\"code\\":\\"0\\",\\"msg\\":\\"connetion_sucess\\",\\"data\\":\\""+msg+"\\",\\"time\\":\\""+df.format(new Date())+"\\"}");
                ctx.flush();
            }
            if("2".equals(status)){
                ctx.write("{\\"code\\":\\"0\\",\\"msg\\":\\"send_sucess\\",\\"data\\":\\""+msg+"\\",\\"time\\":\\""+df.format(new Date())+"\\"}");
                ctx.flush();
            }
            if("3".equals(status)){

                ctx.write("{\\"code\\":\\"0\\",\\"msg\\":\\"electrify_sucess\\",\\"scopeof\\":\\""+msg+"\\",\\"time\\":\\""+df.format(new Date())+"\\"}");
                ctx.flush();
            }
            if("4".equals(status)){
                ctx.write("{\\"code\\":\\"0\\",\\"msg\\":\\"hearbeat_sucess\\",\\"time\\":\\""+df.format(new Date())+"\\"}");
                ctx.flush();
            }
            if("5".equals(status)){
                ctx.write("{\\"cmd\\":\\"control\\",\\"value\\":\\""+msg+"\\"}");
                ctx.flush();
            }
        } catch (Exception e) {
            ctx.write("{\\"code\\":\\"-1\\",\\"msg\\":\\"error\\"}");
            ctx.flush();
        }
    }

}
App.java(启动类)
@SpringBootApplication
@MapperScan("com.xcl.mapper")
public class App implements CommandLineRunner {
	public static void main(String[] args) {
		SpringApplication.run(App.class, args);
	}

	@Autowired
	private NettyServerBootStrap serverBootStrap;

	@Override
	public void run(String... args) throws Exception {
		serverBootStrap.start();
	}

}

测试(端口7988,设备以每秒0ms发送)

最后,注意三点:
1、上报消息体末尾要以\\r\\n结尾,否则接收无效
2、5次以上接收无效,会主动断开你的客户端
3、客户都安连接上了第一次上报数据会失败(服务端第一次接收会重复接收),后面的就是100%了

 

以上是关于springboot集成netty框架(物联网tcp连接,只服务端)的主要内容,如果未能解决你的问题,请参考以下文章

物联网架构成长之路(49)-SpringBoot集成KafKa中间件

物联网架构成长之路(32)-SpringBoot集成MQTT客户端

开源物联网通讯框架ServerSuperIO,成功移植到Windows10 IOT,在物联网和集成系统建设中降低成本。附:“物联网”交流大纲

《连载 | 物联网框架ServerSuperIO教程》- 18.集成OPC Client,及使用步骤。附:3.5 发布与更新说明。

Java开发社招面试总结!mysqllike百分号

Java经典入门教程!弄到一份宝藏级SpringCloud实战文档