IOT云平台 simplespringboot netty实现IOT云平台基本的架构(mqttRabbitmq)

Posted 令狐飞侠

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了IOT云平台 simplespringboot netty实现IOT云平台基本的架构(mqttRabbitmq)相关的知识,希望对你有一定的参考价值。

本系列教程包括:
IOT云平台 simple(0)IOT云平台简介
IOT云平台 simple(1)netty入门
IOT云平台 simple(2)springboot入门
IOT云平台 simple(3)springboot netty实现TCP Server
IOT云平台 simple(4)springboot netty实现简单的mqtt broker
IOT云平台 simple(5)springboot netty实现modbus TCP Master
IOT云平台 simple(6)springboot netty实现IOT云平台基本的架构(mqtt、Rabbitmq)

本章首先简单介绍了IOT云平台最基本的架构,然后基于springboot netty实现IOT Server;最后进行了测试验证。

测试环境:

  1. mqtt终端,这里用MQTT.fx 工具软件模拟;
  2. IOT server:基于springboot netty进行开发;
  3. Rabbitmq broker;本地安装Windows 64位环境;
  4. Rabbitmq consumer,订阅mq message的server,这里用MQTT Assistant工具软件模拟;

1 IOT云平台最基本的架构

本章涉及的IOT云平台最基本架构图:

说明
1)为了简单,这里只包括mqtt上行链路,即mqtt终端上传数据;
2)这里通过Rabbitmq 进行消息的分发,也可以用其他mq中间件,如kafka。
3)Mqtt terminal:mqtt终端,指的是具体的设备传感器或者mqtt协议网关。

具体流程
第1步,mqtt终端
实现mqtt协议或者其他协议(modbus、wifi、蓝牙)转换为mqtt协议。

第2步,mqtt终端->IOT server
mqtt终端publish message到IOT server;

第3步,IOT server->Rabbitmq broker
IOT Server中mqtt broker的模块收到mqtt message,进行解析,然后通过Rabbitmq producer模块,publish message到Rabbitmq broker;

第4步,Rabbitmq broker->不同server
Rabbitmq broker收到消息存入指定的queue,然后分发到订阅消息的不同server;

第5步,不同server
不同server监听收到消息进行相应的处理;如:存入到时序数据库、进行大数据流的计算、具体业务的处理。

2 集成开发

第1步:POM文件引入netty、Rabbitmq的依赖:

        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.63.Final</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
            <version>3.0.1</version>
        </dependency>

2.1实现mqtt broker

创建主要的类:
1)TCPServer
server类,实现mqtt broker。
2 )TCPServerStartListener
监听到springboot启动后,启动server。
3)TCPServerChannelInitializer
server channel初始化的类

包括两个server channel处理的类:
1) MqttMessageChannelHandler:
server channel处理的类;实现mqtt消息的解析;

@Component
@Slf4j
@ChannelHandler.Sharable
public class MqttMessageChannelHandler extends ChannelInboundHandlerAdapter  

    @Autowired
    MessageStrategyManager messageStrategyManager;

    @Override
    public void channelRead(ChannelHandlerContext channelHandlerContext, Object msg) throws Exception 
        MqttMessage mqttMessage = (MqttMessage) msg;
        log.info("--------------------------channelRead begin---------------------------*");
        log.info("from client:" + channelHandlerContext.channel().remoteAddress());
        log.info("receive message:" + mqttMessage.toString());
        try 
            MqttMessageType type = mqttMessage.fixedHeader().messageType();
            MessageStrategy messageStrategy =  messageStrategyManager.getMessageStrategy(type);
            if(messageStrategy!=null)
                messageStrategy.sendResponseMessage(channelHandlerContext,mqttMessage);
            
        catch (Exception e) 
            e.printStackTrace();
        
        log.info("--------------------------channelRead end---------------------------*");

    
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)  // (4)
        // 当出现异常就关闭连接
        cause.printStackTrace();
        ctx.close();
    

2) MqMessageChannelHandler:
server channel处理的类;实现mq消息发布到Rabbitmq broker;

@Component
@Slf4j
public class MqMessageChannelHandler extends ChannelInboundHandlerAdapter 
    @Autowired
    ProducerService producerService;

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception 
        if (!(msg instanceof MqMessage)) 
            return;
        
        MqMessage mqMessage = (MqMessage) msg;
        log.info("转发到Rabbitmq Server:" + mqMessage.data);
        producerService.sendData(mqMessage.data);
    


2.2实现Rabbitmq producer

1 定义配置类ProducerConfig:

exchange(交换机):topic_exchange
queue(队列):topic_queue
bindKey(绑定key):project1.station1.*

@Slf4j
@Configuration
public class ProducerConfig 
    String bindKey = "project1.station1.*";

    @Bean
    public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory)
        RabbitTemplate rabbitTemplate = new RabbitTemplate();
        rabbitTemplate.setConnectionFactory(connectionFactory);
        //开启Mandatory,触发回调函数
        rabbitTemplate.setMandatory(true);
        //ack
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() 
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) 
                log.info("-----------confirm begin---------------");
                log.info("data:" + correlationData);
                if(ack)
                    log.info("Ack:true");
                else
                    log.info("Ack:false");
                
                log.info("cause:" + cause);
                log.info("-----------confirm end---------------");
            
        );
        //return
        rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() 
            @Override
            public void returnedMessage(ReturnedMessage returnedMessage) 
                log.info("-----------return begin---------------");
                log.info("message:"+returnedMessage.getMessage());
                log.info("reply code:"+returnedMessage.getReplyCode());
                log.info("reply text:"+returnedMessage.getReplyText());
                log.info("exchange:"+returnedMessage.getExchange());
                log.info("routeKey:"+returnedMessage.getRoutingKey());
                log.info("-----------return end---------------");
            
        );

        return rabbitTemplate;
    

    @Bean("topic_exchange")
    public TopicExchange topicExchange() 
        return ExchangeBuilder.topicExchange("topic_exchange").durable(true).build();
    

    @Bean("topic_queue")
    public Queue topicQueue()
        return QueueBuilder.durable("topic_queue").build();
    

    @Bean
    public Binding topicBind()
        return BindingBuilder.bind(topicQueue()).to(topicExchange()).with(bindKey);
    

2 定义ProducerService类,实现发送消息的功能:
这里发送:
exchange(交换机):topic_exchange
routeKey (路由key):project1.station1.device1

@Slf4j
@Component
public class ProducerService 
    String exchange = "topic_exchange";
    String routeKey = "project1.station1.device1";
    @Resource
    private RabbitTemplate rabbitTemplate;

    public void sendData(String data) 
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        log.info("------------topic producer begin------------");
        rabbitTemplate.convertAndSend(exchange, routeKey, data);
        log.info("exchange:"+exchange);
        log.info("routeKey:"+routeKey);
        log.info("send data:"+data);
        log.info("------------topic producer end------------");
    

3 测试验证

第1步:mqtt终端发送消息:temperature is 26, 2023.1.14 17:00。

第2步:IOT Server接收到mqtt终端发送的消息;然后转发到Rabbitmq Server;

第3步:Rabbitmq Server的topic_queue中有1条消息;

第4步:MQTT Assistant中从topic_queue中消费消息:

可见,对于物联网的数据,实现了从端到平台(解析、分发、存储)的整个流程。

代码详见:
https://gitee.com/linghufeixia/iot-simple
code5

聊聊如何在华为云IoT平台进行产品开发

本文分享自华为云社区《如何基于华为云IoT物联网平台进行产品开发》,作者: Super.雯 。

华为云物联网平台承载着南北向数据互通的功能职责。在华为云物联网平台基础上实现端到端物联网业务的过程中,开发者需要基于该平台进行二次开发。本文先跟大家一起聊一聊产品开发。

产品开发

在物联网平台集成解决方案中,物联网平台作为承上启下的中间部分,向应用服务器开放API接口,向各种协议的设备提供API对接。为了提供更加丰富的设备管理能力,物联网平台需要理解接入设备具备的能力以及设备上报数据的消息格式,因此,用户需要在控制台上完成产品模型和编解码插件的开发。基于IoT平台去实现一个物联网解决方案时,需完成的详细操作如下表所示:

开发操作开发说明
产品开发主要呈现物联网平台的界面查询与操作,包括产品管理、产品模型开发、插件开发、在线调试等。
应用侧开发主要为业务应用于物联网平台的集成对接开发,包括API接口调用、业务数据获取和HTTPS证书管理。
设备侧开发主要为设备与物联网平台的集成对接开发,包括设备接入物联网平台、业务数据上报和对平台控制命令的处理。

开通设备接入服务后,使用设备接入服务的完整流程主要分为产品开发、应用侧开发、设备侧开发和日常管理。
产品开发:开发者在进行设备接入前,基于控制台进行相应的开发工作,包括创建产品、创建设备、在线开发产品模型、在线开发插件、在线调试、自助测试和发布产品。
设备侧开发:设备侧可以通过集成SDK、模组或者原生协议接入物联网平台。

 应用侧开发:通过API的形式对外开放物联网平台丰富的设备管理能力,应用开发人员基于API接口开发所需的行业应用,如智慧城市、智慧园区、智慧工业、车联网等行业应用,满足不同行业的需求。
日常管理:真实设备接入后,基于控制台或者API接口,进行日常的设备管理。

 

产品模型介绍

产品模型又称Profile,用于定义一款接入设备所具备的属性(如颜色、大小、采集的数据、可识别的指令或者设备上报的事件等信息),然后通过厂家、设备类型和设备型号,唯一标识一款设备,便于平台识别。产品模型可以在设备接入控制台进行无码化开发。


产品模型是用来描述一款设备“是什么”、“能做什么”以及“如何控制该设备”的文件。开发者通过定义产品模型,在物联网平台构建一款设备的抽象模型,使平台理解该款设备支持的服务、属性、命令等信息,如颜色、开关等。当定义完一款产品模型后,在进行注册设备时,就可以使用在控制台上定义的产品模型。

在华为云物联网平台中,产品模型是设备接入的关键内容,里面包含了这个设备所提供的能力与服务,同时还包含了设备上下行的数据格式。例如,在设备上报数据到平台时,平台会根据上报数据的关键字进行产品模型匹配,并将数据格式与匹配上的产品模型文件进行校验,只有匹配成功的数据才会在平台上保存。如果匹配不成功,物联网平台会将上报的数据作为非法数据进行抛弃。


产品信息:描述一款设备的基本信息,包括厂商ID、厂商名称、设备类型、协议类型。例如:水表的厂商名称为“HZYB”,厂商ID为“TestUtf8ManuId”,设备类型为“WaterMeter”,协议类型为“CoAP”。


服务能力:描述设备具备的业务能力。将设备业务能力拆分成若干个服务后,再定义每个服务具备的属性、命令以及命令的参数。以水表为例,水表具有多种能力,如上报水流、告警、电量、连接等各种数据,并且能够接受服务器下发的各种命令。产品模型文件在描述水表的能力时,可以将水表的能力划分五个服务,每个服务都需要定义各自的上报属性或命令。

产品模型案例:智能水表

服务名描述
基础(WaterMeterBasic)用于定义水表上报的水流量、水温、水压等参数,如果需要命令控制或修改这些参数,还需要定义命令的参数。
告警(WaterMeterAlarm)用于定义水表需要上报的各种告警场景的数据,必要的话需要定义命令。
电池(Battery)定义水表的电压、电流强度等数据。
传输规则(DeliverySchedule)定义水表的一些传输规则,必要的话需要定义命令。
连接(Connectivity)定义水表连接参数。

华为云物联网平台提供了多种定义产品模型的方法,开发者可以根据自己需求,选择对应的方法定义产品模型:

  • 导入库模型(平台预置产品模型)
  • 上传模型文件(离线开发)
  • Excel导入
  • 自定义功能(在线开发)

设备编解码插件

编解码插件简介

什么是编解码插件?

编解码插件能够将终端上报的数据(二进制格式)解码为应用服务器所能“阅读”的数据(JSON格式),将服务器端下行命令数据(JSON格式)编码为终端设备所能“理解执行”的数据(二进制格式)。

为什么要使用编解码插件?

NB-IoT设备采用二进制格式或者tlv格式数据。
NB-IoT设备和IoT平台之间采用的是CoAP协议通讯,CoAP消息的payload为应用层协议数据,应用层数据的格式由设备自行定义。鉴于NB-IoT设备一般对省电要求较高,所以应用层数据一般不采用JSON格式数据。
应用服务器端并不理解二进制格式或者tlv格式数据。

如何开发编解码插件?

编解码插件的开发手段有图形化开发、离线开发和脚本化开发三种,由于插件离线开发较为复杂,且耗时比较长,推荐使用图形化开发编解码插件。
图形化开发是指在设备接入控制台,通过可视化的方式快速开发一款产品的编解码插件。
离线开发是指使用编解码插件的Java代码Demo进行二次开发,实现编解码功能、完成插件打包和质检等。
脚本化开发是指使用JavaScript脚本实现编解码的功能。
华为云物联网平台图形化编解码插件开发采用了将原有的插件开发代码进行抽象封装技术,开发者无需了解java编程语言,只需在开发界面将设备码流的格式完成定义,并将码流与profile中的属性关系通过拖拽的方式完成映射,点击部署后Portal会根据开发者的设计自动生成插件并打包部署到物联网平台。
对于设备发来的上行消息,首先解析CoAP报文得到应用层数据,然后调用设备厂商提供的插件解码,从而将消息发送到应用平台;对于来自应用平台的下行消息,需要调用设备厂商提供的编解码插件,组装CoAP消息发送到设备,如下图所示。此外编解码插件还负责对平台下发命令和对上报数据的响应进行编码。

数据上报

消息处理流程包括数据上报处理流程和命令下发处理流程,数据上报处理流程如下图所示。

当设备和物联网平台完成对接后,一旦设备上电,设备基于在设备上定义的业务逻辑进行数据采集和上报,可以是基于周期或者事件触发。设备可通过以下方式发送数据到物联网平台:


设备消息上报:设备可以通过消息上报接口将设备的自定义数据上报到平台,平台对设备上报的消息不进行解析和存储,通过数据转发规则转发到华为云其他云服务上进行存储和处理,然后通过其他云服务的控制台或者API接口进行进一步的数据处理。


设备原始数据(二进制)上报:设备可以通过二进制上报接口上报设备的原始码流,平台通过编解码插件将设备原始数据解析为产品模型定义的JSON格式,解析后的数据上报给设备接入服务进行相关业务处理。


设备属性上报:设备通过属性上报接口,将产品模型中定义的属性数据上报给平台,平台解析后的数据上报给设备接入服务进行相关业务处理。


网关批量属性上报:网关设备将批量设备的数据一次性上报到平台,平台解析后的数据上报给设备接入服务进行相关业务处理。


在数据上报处理流程中,有两处需要用到编解码插件。

  1. 将设备上报的二进制码流的数据解码成JSON格式的数据,发送给应用服务器。
  2. 将应用服务器响应的JSON格式数据编码成二进制码流格式的数据,下发给设备。

命令下发

命令下发是指平台将命令下发到设备,设备响应并执行命令,从而达到平台到设备远程控制的效果。


这里分别列举了LwM2M/CoAP和MQTT这两种协议的立即下发和缓存下发的流程,如图所示。

应用下发命令到物联网平台,平台会根据对应的产品信息找到对应的编解码插件,对命令请求进行编码,将命令下发给设备;下发命令成功或失败,会根据下发结果更新命令状态,若设备对命令做出了响应,会将命令状态更新为“执行成功”或“执行失败”。


由于使用MQTT是不需要编解码插件的,直接使用透传的形式,当命令立即下发时,应用下发命令到物联网平台,平台将命令下发至设备,返回执行结果,消息执行结果通知。

应用缓存下发命令,当设备不在线时,将命令写入缓存队列,更新消息状态,直到设备上报数据,设备上线,订阅消息下发Topic,消息下发,更新消息状态。


设备编解码插件示例:

更多学习内容,请[关注IoT物联网社区](IoT_物联网(IoT)开发者_开发者中心_华为云)

添加华为云IoT小助手微信号(hwc-iot),回复“阅读”获取更多资讯

点击关注,第一时间了解华为云新鲜技术~​

创作打卡挑战赛 赢取流量/现金/CSDN周边激励大奖

以上是关于IOT云平台 simplespringboot netty实现IOT云平台基本的架构(mqttRabbitmq)的主要内容,如果未能解决你的问题,请参考以下文章

华为云IoT体验:基于IoT平台构建智慧路灯应用

华为云IoT体验:基于IoT平台构建智慧路灯应用

设备如何使用go sdk轻松连接华为云IoT平台

鸿蒙之连接IoT云平台(华为云)

OpenHarmony3.0如何轻松连接华为云IoT设备接入平台?

国内物联网平台初探:机智云IoT物联网云服务平台及智能硬件自助开发平台