springboot集成netty框架(物联网tcp连接,只服务端)
Posted 盖被子的冰块
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了springboot集成netty框架(物联网tcp连接,只服务端)相关的知识,希望对你有一定的参考价值。
Maven
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>5.0.0.Alpha2</version>
</dependency>
NettyServerBootStrap.java(带解决粘包问题,客户端上报消息体末尾的加上\\r\\n,可自定义)
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioserverSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class NettyServerBootStrap {
@Autowired
private NettyServerHandler nettyServerHandler;
public void start() throws InterruptedException {
EventLoopGroup boss = new NioEventLoopGroup();
EventLoopGroup worker = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
try {
bootstrap.group(boss, worker)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
// 使消息立即发出去,不用等待到一定的数据量才发出去
.option(ChannelOption.TCP_NODELAY, true)
// 保持长连接状态
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline p = socketChannel.pipeline();
ByteBuf delimiter = Unpooled.copiedBuffer("\\\\r\\\\n".getBytes());
p.addLast(new DelimiterBasedFrameDecoder(2024,delimiter));
p.addLast(new StringDecoder(CharsetUtil.UTF_8));
p.addLast(new StringEncoder(CharsetUtil.UTF_8));
p.addLast(nettyServerHandler);
}
});
// 绑定端口,同步等待成功
ChannelFuture f = bootstrap.bind(7988).sync();
if (f.isSuccess()) {
log.info("Netty Start successful");
} else {
log.error("Netty Start failed");
}
// 等待服务监听端口关闭
f.channel().closeFuture().sync();
} finally {
// 退出,释放线程资源
worker.shutdownGracefully();
boss.shutdownGracefully();
}
}
}
NettyServerHandler.java(处理业务)
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.xcl.entity.Nettyclient;
import com.xcl.service.SocketssService;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.SocketChannel;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.net.InetSocketAddress;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
@Component
@ChannelHandler.Sharable
@Slf4j
public class NettyServerHandler extends SimpleChannelInboundHandler<String> {
@Autowired
com.xcl.service.SocketssService SocketssService;
String requestJson;
private static NettyServerHandler NettyServerHandler;
/**
* 管理一个全局map,保存连接进服务端的通道数量
*/
//连接map
public static Map<String, ChannelHandlerContext> map = new HashMap<String, ChannelHandlerContext>();
public static final ConcurrentHashMap<String, Nettyclient> CHANNEL_MAP3 = new ConcurrentHashMap<>();
@PostConstruct
public void init() {
NettyServerHandler = this;
}
/**
* @Description 客户端连接时执行,将客户端信息保存到Map中
* @param ctx
* @Date 2019/8/28 14:22
* @Author xuchenliang
* @return
**/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
String clientIp = insocket.getAddress().getHostAddress();
int clientPort = insocket.getPort();
// 获取连接通道唯一标识
//ChannelId channelId = ctx.channel().id();
String channelId2 = ctx.channel().id().toString();
// 如果map中不包含此连接,就保存连接
if (CHANNEL_MAP3.containsKey(channelId2)) {
//log.info("客户端【" + channelId + "】是连接状态,连接通道数量: " + CHANNEL_MAP.size());
} else {
// 保存连接
//CHANNEL_MAP.put(channelId, ctx);
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Nettyclient Nettyclient=new Nettyclient();
Nettyclient.setCtx(ctx);
Nettyclient.setCreatetime(df.format(new Date()));
Nettyclient.setPort(clientPort+"");
CHANNEL_MAP3.put(channelId2,Nettyclient);
channelWriteClient(channelId2,"hello,"+channelId2,"1");
System.out.println("客户端【" + channelId2 + "】连接netty服务器[IP:" + clientIp + "--->PORT:" + clientPort + "]");
System.out.println("连接通道数量: " + CHANNEL_MAP3.size());
/*log.info("客户端【" + channelId + "】连接netty服务器[IP:" + clientIp + "--->PORT:" + clientPort + "]");
log.info("连接通道数量: " + CHANNEL_MAP.size());*/
}
}
/**
* @Description 客户端断开连接时执行,将客户端信息从Map中移除
* @param ctx
* @Date 2019/8/28 14:22
* @Author xuchenliang
* @return
**/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
String clientIp = insocket.getAddress().getHostAddress();
//ChannelId channelId = ctx.channel().id();
String channelId2 = ctx.channel().id().toString();
// 包含此客户端才去删除
if (CHANNEL_MAP3.containsKey(channelId2)) {
// 删除连接
CHANNEL_MAP3.remove(channelId2);
/*log.info("客户端【" + channelId + "】退出netty服务器[IP:" + clientIp + "--->PORT:" + insocket.getPort() + "]");
log.info("连接通道数量: " + CHANNEL_MAP.size());*/
}
Collection<ChannelHandlerContext> col = NettyServerHandler.map.values();
while(true == col.contains(channelId2)) {
col.remove(channelId2);
}
}
/**
* @Description 收到消息时执行,根据消息类型做不同的处理
* @param ctx
* @param msg
* @Date 2019/8/28 14:33
* @Author xuchenliang
* @return
**/
JSONObject jsonObject;
List<JSONObject> jsonlist;
@Override
public void messageReceived(ChannelHandlerContext ctx, String msg) throws Exception {
try {
jsonObject = JSON.parseObject(msg.toString());
jsonlist = (List<JSONObject>) jsonObject.get("detail");
if(jsonlist!=null){
for (int i = 0; i < jsonlist.size(); i++) {
NettyServerHandler.map.put(jsonlist.get(i).getString("iemi"), ctx);
}
}
// json数组
requestJson = NettyServerHandler.SocketssService.connectSocketss(msg.toString(),ctx.channel().id().toString());
ctx.write(requestJson);
ctx.flush();
} catch (Exception e) {
e.printStackTrace();
ctx.write("{\\"code\\":\\"-1\\",\\"msg\\":\\"param_error\\"}");
ctx.flush();
}
}
// 需要发送的消息内容
public void channelWrite(String item, Object msg) throws Exception {
//ChannelHandlerContext ctx = CHANNEL_MAP.get(channelId);
ChannelHandlerContext ctx= NettyServerHandler.map.get(item);
if (ctx == null) {
//log.info("通道【" + ctx.channel().id() + "】不存在");
return;
}
if (msg == null || msg == "") {
//log.info("服务端响应空的消息");
return;
}
// 将客户端的信息直接返回写入ctx
ctx.write(msg);
// 刷新缓存区
ctx.flush();
}
/**
* @description: TODO
* @param ctx
* @param cause
* @Author: xuchenliang
* @Date: 2019/08/30 13:41:51
* @return: void
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
/**
* 主动向指定的客户端发消息
*/
public static void channelWriteClient(String channelId, String msg,String status) throws Exception {
ChannelHandlerContext ctx = null;
for(String key : CHANNEL_MAP3.keySet()){
if(channelId.equals(key)){
ctx=CHANNEL_MAP3.get(key).getCtx();
}
}
try {
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
if("1".equals(status)){
ctx.write("{\\"code\\":\\"0\\",\\"msg\\":\\"connetion_sucess\\",\\"data\\":\\""+msg+"\\",\\"time\\":\\""+df.format(new Date())+"\\"}");
ctx.flush();
}
if("2".equals(status)){
ctx.write("{\\"code\\":\\"0\\",\\"msg\\":\\"send_sucess\\",\\"data\\":\\""+msg+"\\",\\"time\\":\\""+df.format(new Date())+"\\"}");
ctx.flush();
}
if("3".equals(status)){
ctx.write("{\\"code\\":\\"0\\",\\"msg\\":\\"electrify_sucess\\",\\"scopeof\\":\\""+msg+"\\",\\"time\\":\\""+df.format(new Date())+"\\"}");
ctx.flush();
}
if("4".equals(status)){
ctx.write("{\\"code\\":\\"0\\",\\"msg\\":\\"hearbeat_sucess\\",\\"time\\":\\""+df.format(new Date())+"\\"}");
ctx.flush();
}
if("5".equals(status)){
ctx.write("{\\"cmd\\":\\"control\\",\\"value\\":\\""+msg+"\\"}");
ctx.flush();
}
} catch (Exception e) {
ctx.write("{\\"code\\":\\"-1\\",\\"msg\\":\\"error\\"}");
ctx.flush();
}
}
}
App.java(启动类)
@SpringBootApplication
@MapperScan("com.xcl.mapper")
public class App implements CommandLineRunner {
public static void main(String[] args) {
SpringApplication.run(App.class, args);
}
@Autowired
private NettyServerBootStrap serverBootStrap;
@Override
public void run(String... args) throws Exception {
serverBootStrap.start();
}
}
测试(端口7988,设备以每秒0ms发送)
最后,注意三点:
1、上报消息体末尾要以\\r\\n结尾,否则接收无效
2、5次以上接收无效,会主动断开你的客户端
3、客户都安连接上了第一次上报数据会失败(服务端第一次接收会重复接收),后面的就是100%了
以上是关于springboot集成netty框架(物联网tcp连接,只服务端)的主要内容,如果未能解决你的问题,请参考以下文章
物联网架构成长之路(49)-SpringBoot集成KafKa中间件
物联网架构成长之路(32)-SpringBoot集成MQTT客户端
开源物联网通讯框架ServerSuperIO,成功移植到Windows10 IOT,在物联网和集成系统建设中降低成本。附:“物联网”交流大纲
《连载 | 物联网框架ServerSuperIO教程》- 18.集成OPC Client,及使用步骤。附:3.5 发布与更新说明。