流分析 - 将传入的 JSON 转换为 PowerBI 流数据集
Posted
技术标签:
【中文标题】流分析 - 将传入的 JSON 转换为 PowerBI 流数据集【英文标题】:stream analytics - converting incoming JSON to PowerBI streaming dataset 【发布时间】:2018-05-15 13:46:47 【问题描述】:我有一个传感器以以下格式(通过 Modbus 服务器)向 IoTHub 报告数据:
15/05/2018 14:56:56> Device: [dev], Data:[["DisplayName":"Temperature","HwId":"PI-1","Address":"400002","Value":"192","SourceTimestamp":"2018-05-15 13:56:52","DisplayName":"Humidity","HwId":"PI-1","Address":"400001","Value":"397","SourceTimestamp":"2018-05-15 13:56:52"]]Properties:
'content-type': 'application/edge-modbus-json'
15/05/2018 14:57:00> Device: [dev], Data:[["DisplayName":"Temperature","HwId":"PI-1","Address":"400002","Value":"201","SourceTimestamp":"2018-05-15 13:56:57","DisplayName":"Humidity","HwId":"PI-1","Address":"400001","Value":"397","SourceTimestamp":"2018-05-15 13:56:57"]]Properties:
'content-type': 'application/edge-modbus-json'
15/05/2018 14:57:06> Device: [dev], Data:[["DisplayName":"Temperature","HwId":"PI-1","Address":"400002","Value":"201","SourceTimestamp":"2018-05-15 13:57:02","DisplayName":"Humidity","HwId":"PI-1","Address":"400001","Value":"397","SourceTimestamp":"2018-05-15 13:57:02"]]Properties:
'content-type': 'application/edge-modbus-json'
15/05/2018 14:57:10> Device: [dev], Data:[["DisplayName":"Temperature","HwId":"PI-1","Address":"400002","Value":"195","SourceTimestamp":"2018-05-15 13:57:07","DisplayName":"Humidity","HwId":"PI-1","Address":"400001","Value":"397","SourceTimestamp":"2018-05-15 13:57:07"]]Properties:
'content-type': 'application/edge-modbus-json'
每个传感器都在一个单独的数组条目中报告,并在传感器的显示名称和值之间进行拆分。
我想要的是一个 JSON 有效负载,我可以将它输入 PowerBI,因此需要按顺序排列:
时间戳:时间,湿度:湿度值,温度:温度值
如何构建合适的流分析查询来执行此操作?这种输入格式是典型的 Modbus 或 OPC-UA 类型设备,因此可能会遇到几次。
我尝试使用 GetArrayElement/(s),但该数组在 JSON 中没有名称,因此没有可引用的内容。
【问题讨论】:
【参考方案1】:据我了解,您可以利用 Azure Stream Analytics javascript user-defined functions 来整理您的数据。
假设您的数据如下所示:
Device:"dev01",
Data:[
[
"DisplayName":"Temperature","HwId":"PI-1","Address":"400002","Value":"192","SourceTimestamp":"2018-05-15 13:56:52"
,"DisplayName":"Humidity","HwId":"PI-1","Address":"400001","Value":"397","SourceTimestamp":"2018-05-15 13:56:52"
]
]
您可以创建以下 UDF:
GetValueByPropertyName:
function main(arrs,propertyname)
for(var i=0;i<arrs.length;i++)
var item=arrs[i];
if(item.hasOwnProperty(propertyname))
return item[propertyname];
return '';
GetValueByDisplayName:
function main(dataArr,displayname)
if(dataArr)
for(var j=0;j<dataArr.length;j++)
var subArr=dataArr[j];
for(var i=0;i<subArr.length;i++)
var obj=subArr[i];
if(obj.DisplayName.toLowerCase()==displayname.toLowerCase())
return obj.Value;
return '';
示例查询:
select
input.device as DeviceName,
UDF.GetValueByDisplayName(input.Data,'Temperature') as Temperature,
UDF.GetValueByDisplayName(input.Data,'Humidity') as Humidity,
UDF.GetValueByPropertyName(GetArrayElement(input.Data,0),'SourceTimestamp') as [Timestamp]
from input
测试:
【讨论】:
【参考方案2】:实际上找到了一种更简单的方法来直接处理这个没有函数。将 CASE 语句与 LAST 结合使用。
SELECT System.Timestamp as timestamp,
CASE Address
WHEN '400001' THEN cast(Value as float)/10 ELSE last(cast(Value as float)/10) over (partition by HwId limit duration(day,1) when Value is not null and Address like '400001')
END
AS Humidity,
CASE Address
WHEN '400002' THEN cast(Value as float)/10 ELSE last(cast(Value as float)/10) over (partition by HwId limit duration(day,1) when Value is not null and Address like '400002')
END
AS Temperature,
CASE Address
WHEN '400003' THEN cast(Value as float)/10 ELSE last(cast(Value as float)/10) over (partition by HwId limit duration(day,1) when Value is not null and Address like '400003')
END
AS Pressure
INTO PowerBI
FROM IoTHub as event
【讨论】:
以上是关于流分析 - 将传入的 JSON 转换为 PowerBI 流数据集的主要内容,如果未能解决你的问题,请参考以下文章
IoT 中心/流分析 - SQL - 将传入时间戳转换为日期时间
来自 Data Lake Gen2 的 Power BI 流数据集