Java物联网企业级项目 指标数据采集与断连监控
Posted 办公模板库 素材蛙
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java物联网企业级项目 指标数据采集与断连监控相关的知识,希望对你有一定的参考价值。
第2章 指标数据采集与断连监控
学习目标
- 能够完成EMQ指标主题订阅
- 能够完成指标数据的解析
- 能够完成告警判断与存储
- 能够运用EMQ的webhook实现设备断连监控
1. EMQ指标主题订阅
1.1 Eclipse paho简介
Eclipse paho是eclipse基金会下面的一个开源项目,基于MQTT协议的客户端,用多种语言的实现。什么是MQTT?可以关注之前的EMQ课程,里面有详细介绍。 这几年的很火的物联网多是基于这个协议来通信的。
Eclipse paho支持的客户端语言很多,有java、Python、javascript、GoLang、C 、C++ 、C#等。
这里我们使用的是基于java语言版本的实现,这个版本的实现可以运行在JVM之上或者其他兼容于java的平台,比如安卓平台上。
Paho Java Client提供了两种API:
MqttAsyncClient:该API是完全基于异步来实现的,通过在启动时注册一个回调(callbacks),来实现消息的异步收发处理。
MqttClient:是基于同步的方式实现的消息收发处理,在亿可控项目中我们使用同步的方式来接收处理消息。
1.2 发送与订阅消息
1.2.1 发送消息
(1)添加和emq通信包paho的引用,paho不光能和emq通信,只要是基于mqtt协议实现的消息代理服务器,paho都能作为客户端和其进行通信。同时编写和mqtt通信的客户端代码。
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.2</version>
</dependency>
(2) 在consul中添加emq相关的配置:
emq:
mqttServerUrl: tcp://192.168.200.128:1883
(3)在com.yikekong.config包下定义emq配置类:
package com.yikekong.config;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
@Configuration
@ConfigurationProperties("emq")
@Data
public class EmqConfig
private String mqttServerUrl;
(4)编写EMQ客户端类,新增连接方法
package com.yikekong.emq;
import com.yikekong.config.EmqConfig;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.UUID;
@Component
@Slf4j
public class EmqClient
@Autowired
private EmqConfig emqConfig;//emq配置
private MqttClient mqttClient;
/**
* 连接mqtt broker
*/
public void connect()
try
mqttClient = new MqttClient(
emqConfig.getMqttServerUrl(),"monitor."+ UUID.randomUUID().toString());
mqttClient.connect();
catch (MqttException e)
log.error("mqtt creat error",e);
(5)编写发布消息的方法
/**
* 发布消息
* @param topic 消息主题
* @param msg 发送的消息
*/
public void publish(String topic,String msg)
try
MqttMessage mqttMessage = new MqttMessage(msg.getBytes());
mqttClient.getTopic(topic).publish(mqttMessage);//向某主题发送消息
catch (MqttException e)
log.error("mqtt publish msg error",e);
测试:
(1)编写单元测试
@SpringBootTest
@RunWith(SpringRunner.class)
public class EmqTest
@Autowired
private EmqClient emqClient;
@Test
public void testSend()
emqClient.connect();
emqClient.publish("test_topic","test_content");
(2)打开EMQ
http://192.168.200.128:18083 ,选择Tools下的Websocket进行测试
(3)连接并订阅主题
输入主题名称test_topic
(4)调用单元测试方法
测试后可以看到列表中有接收到的消息。
1.2.2 订阅消息
(1)EmqClient类新增方法,用于订阅主题
/**
* 订阅主题
* @param topicName
* @throws MqttException
*/
public void subscribe(String topicName) throws MqttException
mqttClient.subscribe(topicName);
(2)接收消息回调类: com.yikekong.emq包下创建消息接收处理类:
package com.yikekong.emq;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.yikekong.dto.*;
import com.yikekong.entity.GPSEntity;
import com.yikekong.es.ESRepository;
import com.yikekong.service.*;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Map;
@Component
@Slf4j
public class EmqMsgProcess implements MqttCallback
@Override
public void connectionLost(Throwable throwable)
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception
String payload = new String(mqttMessage.getPayload());
System.out.println("接收到数据:"+payload);
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken)
(3)修改EmqClient的connect方法,添加代码
mqttClient.setCallback(emqMsgProcess);
(4)编写监控类,启动后自动订阅主题
package com.yikekong.core;
import com.yikekong.emq.EmqClient;
import com.yikekong.service.QuotaService;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
/**
* 自动监控
*/
@Component
@Slf4j
public class Monitor
@Autowired
private EmqClient emqClient;
@PostConstruct
public void init()
emqClient.connect();
emqClient.subscribe("mytopic");
测试:
(1)启动工程
(2)打开emq的websocket 工具, 连接,向mytopic主题发送消息
点击发送后,控制台可以显示出接收的消息
1.3 订阅指标主题
1.3.1 需求分析
亿可控实现功能:订阅指标配置中的主题。
1.3.2 实现思路
(1)系统启动时,获取所有的主题名称,循环调用订阅主题的方法。
(2)创建新的指标时,根据设置的主题名称订阅。
1.3.3 代码实现
(1)修改Monitor的init方法,实现启动时订阅所有指标配置的主题
@Component
@Slf4j
public class Monitor
@Autowired
private EmqClient emqClient;
@Autowired
private QuotaService quotaService;
@PostConstruct
public void init()
System.out.println("初始化方法,订阅主题");
emqClient.connect();
quotaService.getAllSubject().forEach(s ->
try
emqClient.subscribe("$queue/"+s);
catch (MqttException e)
e.printStackTrace();
);
需要注意的是在订阅主题的时候我们使用了共享队列来接收($queue/),主要考虑到的是客户端物联网设备的个数比较庞大,在同一时刻或一段时间之内,上报的消息量过于庞大,接收消息的地方会很容易被击垮。使用共享队列,天然的平滑支持了分布式部署,面对巨大的消息量我们只需要部署多份亿可控节点来接收消息并处理就行了,无需做任何复杂的负载均衡处理,这样对研发和部署成本是最低的。
(2)修改QuotaController的create方法,新增主题订阅代码
@Autowired
private EmqClient emqClient;
/**
* 创建指标
* @param vo
* @return
*/
@PostMapping
public boolean create(@RequestBody QuotaVO vo)
try
QuotaEntity quotaEntity = new QuotaEntity();
BeanUtils.copyProperties(vo,quotaEntity);
emqClient.subscribe("$queue/"+quotaEntity.getSubject());//添加这句!
return quotaService.save(quotaEntity);
catch (DuplicateKeyException e)
throw new BussinessException("已存在该名称");
catch (MqttException e)
log.error("订阅主题失败",e);
return false;
(3)丢失连接后再次连接和订阅,修改EmqMsgProcess的connectionLost方法
@Autowired
private QuotaService quotaService;
@Autowired
private EmqClient emqClient;
@Override
public void connectionLost(Throwable throwable)
log.info("emq connect lost");
//当连接丢失时再次连接emq
emqClient.connect();
//重新订阅所有主题
quotaService.getAllSubject().forEach(s ->
try
subscribe("$queue/"+s);
catch (MqttException e)
e.printStackTrace();
);
2.指标数据解析
2.1 需求分析
我们的系统通过后台接口在前端页面中添加的一些指标数据的配置定义,然后在接收到EMQ的数据之后,跟这些提前配置好的数据进行解析。
其中针对指标定义添加的产品页面如下:
- 指标名称:对应将来在系统中需要展示的名称
- 单位:主要是用来解析和保存设备报文中对应的指标数据的单位,比如:摄氏度、米这样的单位
- 报文主题:对应mqtt协议中的报文主题,对应的主题数据就需要在系统中接收和解析处理
- 指标值字段:报文中需要接收解析、处理的字段名称,根据该名称获取对应的值
- 指标值数据类型:就是数据属于什么类型,类似编程语言的数据类型,有Double、String、Boolean等
- 设备识别码字段:对应报文数据中设备编码的字段,主要用来区分设备,方便数据的存取
- web hook:指标数据需要透传到的外部web接口地址
- 安全值:主要用来显示指标数据安全值的一个范围
通过在系统中创建这样的指标数据,方便订阅EMQ主题来接收响应数据并进行后期处理。
2.2 实现思路
(1)定义用于封装设备和指标数据的DTO 。
报文有可能是一个设备多个指标
"sn":"123456",
"temp":1.2,
"humi":50
(2)编写业务逻辑方法,接收参数为主题和报文map,根据主题提取字段名称,从报文map中提取数据。
(3)在接收报文后,调用此业务逻辑实现指标数据的解析。
2.3 代码实现
(1)创建用于存储指标数据的DTO,封装指标数据
package com.yikekong.dto;
import lombok.Data;
import java.io.Serializable;
/**
* 指标DTO
*/
@Data
public class QuotaDTO implements Serializable
/**
* 指标ID
*/
private Integer id;
/**
* 指标名称
*/
private String quotaName;
/**
* 单位
*/
private String unit;
/**
* 报文主题
*/
private String subject;
/**
* 指标值字段名称
*/
private String valueKey;
/**
* 指标值数据类型
*/
private String valueType;
/**
* 指标值(数值)
*/
private Double value;
/**
* 指标值(非数值)
*/
private String stringValue;
/**
* 设备识别码字段(设备Id)
*/
private String snKey;
/**
* web钩子地址
*/
private String webhook;
/**
* 参考值
*/
private String referenceValue;
/**
* 设备Id
*/
private String deviceId;
(2)创建用于存储设备和指标列表的DTO
package com.yikekong.dto;
import lombok.Data;
import java.util.List;
@Data
public class DeviceInfoDTO
private DeviceDTO device;//设备
private List<QuotaDTO> quotaList; //指标列表
(3)在QuotaService接口里新增方法
/**
* 解析报文
* @param topic 主题名称
* @param payloadMap 报文内容
* @return 设备(含指标列表)
*/
DeviceInfoDTO analysis(String topic, Map<String, Object> payloadMap);
在QuotaServiceImpl实现类里实现该接口方法:
@Override
public DeviceInfoDTO analysis(String topic, Map<String, Object> payloadMap)
//1.获取指标配置
List<QuotaEntity> quotaList = baseMapper.selectBySubject(topic);//根据主题查询指标列表
if(quotaList.size()==0) return null;
//2.封装设备信息
String snKey=quotaList.get(0).getSnKey();
if( Strings.isNullOrEmpty(snKey) ) return null;
String deviceId = (String) payloadMap.get(snKey);//设备编号
if( Strings.isNullOrEmpty(deviceId) ) return null;
DeviceDTO deviceDTO=new DeviceDTO();
deviceDTO.setDeviceId(deviceId);
//3.封装指标列表 : 循环我们根据主题名称查询得指标列表,到报文中提取,如果能够提到,进行封装
List<QuotaDTO> quotaDTOList=Lists.newArrayList();
for( QuotaEntity quota:quotaList )
String quotaKey = quota.getValueKey();//指标key
if( payloadMap.containsKey(quotaKey) )
QuotaDTO quotaDTO=new QuotaDTO();
//复制指标配置信息
BeanUtils.copyProperties( quota, quotaDTO);
quotaDTO.setQuotaName( quota.getName() );
//指标值封装
//指标分为两种 1.数值 2.非数值(string boolean)
//1.数值 value 存储数值 stringValue :存储数值字符串
//2.非数值 value 0 stringValue:内容
//如果是非数值
if( "String".equals(quotaDTO.getValueType()) || "Boolean".equals(quotaDTO.getValueType()) )
quotaDTO.setStringValue( (String) payloadMap.get(quotaKey) );
quotaDTO.setValue(0d);
else//如果是数值
if( payloadMap.get(quotaKey) instanceof String )
quotaDTO.setValue( Double.valueOf( (String) payloadMap.get(quotaKey) ) );
quotaDTO.setStringValue( (String) payloadMap.get(quotaKey) );
else
quotaDTO.setValue( Double.valueOf( payloadMap.get(quotaKey) +"" ) );
quotaDTO.setStringValue( quotaDTO.getValue()+"" );
quotaDTO.setDeviceId( deviceId );
quotaDTOList.add(quotaDTO);
//4.封装设备+指标列表返回
DeviceInfoDTO deviceInfoDTO指令集数据产品如何设计和实现报表协同系统——基于指令集物联网操作系统的工业协同制造项目开发实践
1 背景
某汽车零部件企业拟搭建一套适用于股份公司的「协同制造服务云平台」。该项目利用指令集工业智能操作系统iSysCore IIOS(简称“指令集IIOS”)的数据互联能力,从数据集成、数据建模、数据展现三个方面,对9大业务领域进行指标建模和数据可视化分析,实时监控企业运营数据,提高企业协同管理能力。
项目包含PC端报表填报与查看功能、移动端的统计分析查看与KPI指标查看的功能。结合实际情况,为尽快达到预期效果和实现项目目标,主要是以手动填报和结果查看为主,通过手动填报并自动汇总方式,再考虑自动从各分子公司的业务系统中提取数据到「协同制造服务云平台」数据仓库。总公司和分子公司领导主要是查看统计分析结果和KPI指标,分子公司业务人员主要是数据填报和查看等。
2 整体方案
根据前期需求调研和分析,整个系统功能架构大致包含数据同步、数据填报、流程管理、报表管理核心功能模块,功能架构如下:
根据系统功能分解,需要数据填报、流程管理、数据处理、报表四个方面的数据能力,指令集在这四个方向都打磨了对应的产品,对应的系统实现架构可快速匹配满足,架构如下:
01 数据录入
方案使用指令集iSysCore FORM,提供丰富的表单组件,可快速搭建不同信息收集场景下的表单,快速实现信息收集侧的需求。
02 流程管理
方案使用指令集iSysCore FLOW,支持标准的的BPMN流程管理,且与iSysCore FORM打通,将表单填报到流程审批进行一体化配置。
03 数据处理
方案使用指令集iSysCore UDMP,支持标准SQL和自定义函数进行数据处理、清洗开发,同时支持实时、定时不同数据处理机制,让数据指标层建设变得灵活、强大。
04 数据报表
方案使用指令集iSysCore Report,供类EXCEL设计器,通过可拖拽数据式设计,将数据配置可视化,并对数据进行计算分析后动态展,真正做到报表设计的所见即所得。
3 分钟级表单搭建,拖拽设计
基础信息收集、业务填报涉及大量的表单搭建,指令集IIOS中的表单引擎让一名无任何开发背景的实施工程师通过简单的拖拉拽就能搭建满足不同业务场景的表单 ,同时提供表单校验逻辑、数据初始化、级联、业务属性等要求而抽象的一套功能模型,保证了不同的业务规则下的诉求。在不到两天的时间里,就能完成30-40个业务表单的配置和发布。
- 组件库:丰富的基础组件、沉淀业务组件
- 可视化:提供可视化配置功能,实现所见即所得
- 功能模型:支持各种功能函数(日期、数字、文本)、校验器、数据源、业务属性等等
- 集成能力:多API接口集成,也可嵌入业务系统中,保持业务系统融合的连续性
注:图中数据为测试环境数据
4 数据快速接入,加工处理
有了收集的表单数据,还要与即存的业务系统的数据结合,进行数据转换、加工、聚合统计等,才能得到用于分析使用的数据指标。
指令集IIOS中的通用数据管理模块(UDMP)提供了一站式的从数据抽取、数据转换和加工、数据指标建设等数仓体系化建设能力。无论是异构数据源、API接口、非结构化数据文件都能快速接入,用户也可以自行开发自定义函数(UDF)加载到平台进行复杂业务逻辑计算。
注:图中数据为测试环境数据
注:图中数据为测试环境数据
5 报表在线设计,所见即所得
完成了表单数据收集,业务系统数据接入与指标聚合计算后,接下来就是根据不同业务分析诉求,来搭建各种业务分析图表。
IIOS中报表引擎作为一款数据展示分析工具,提供类EXCEL设计器,通过可拖拽数据式设计,将数据配置可视化,并对数据进行计算分析后动态展示,且依托IIOS的一体化优势,报表与表单和UDMP数据集成是直接打通的,在配置报表数据集过程中直接下拉即可选择使用。
1. 复杂布局
如下图,报表中需实现多种样式,包含交叉式、分组式和主子式,多sheet等复杂中国式报表。
注:图中数据为测试环境数据
2. 形态与条件格式化
- 形态:在不改变原有数据格式的情况下,要将数据转化成不同形态进行显示,例如将一个订单号转化成二维条码进行显示。
- 条件格式化:用户可自定义设置条件,将满足条件的单元格数据改变字体颜色、背景、边框等,甚至可以将值替换。
如下图所示,可设置规则,让单元格背景色根据单元格取值变换:
注:图中数据为测试环境数据
规则含义为「当单元格大于等于80%时,背景色设置为绿色」,实际预览效果如下:
注:图中数据为测试环境数据
3. 公式支持
累计提供 90种函数公式计算,支持类似Excel常用函数,涉及数学、文本、逻辑等,报表特别提供数据集函数、格间计算,全力帮助用户统计分析数据。
例如:动态显示列标题为“XX月实际累计”,比如此时是6月,则显示为“6月实际累计”,即可通过字符串拼接函数concatenate(xx,x)实现,设置如下:
Ø 报表设计态:
注:图中数据为测试环境数据
Ø 公式详细编辑:
注:图中数据为测试环境数据
- cx_month为参数面板定义的变量,为用户选择的月份
- Concatenate(cx_month,"实际累计")实现了报表运行时变量字符串拼接,将计算交由前端计算,省去了后端编码
Ø 预览效果:
注:图中数据为测试环境数据
6 总结
注:图中数据为测试环境数据
注:图中数据为测试环境数据
如上图所示,该汽车零部件企业累计搭建了75张报表,原有基于人工线下需要几天甚至几周才能收集汇总形成的报表,现在可以在1天内即可实现收集、汇总形成分析报表,并且不会出现人工处理错误导致的数据准确性问题,极大地提高了市场部的运营管理效率。
指令集产品矩阵中除了数据产品还有很多其他助力企业数字化的产品,比如流程控制,有iSysCore FLOW(工作流引擎)、iSysCore APIX(接口集成服务),我们将在后续持续介绍更多产品及在项目中的实践。
以上是关于Java物联网企业级项目 指标数据采集与断连监控的主要内容,如果未能解决你的问题,请参考以下文章
指令集数据产品如何设计和实现报表协同系统——基于指令集物联网操作系统的工业协同制造项目开发实践