通过 cql 流分析在 json msg 中迭代嵌套列表
Posted
技术标签:
【中文标题】通过 cql 流分析在 json msg 中迭代嵌套列表【英文标题】:iterate nested list in json msg by cql stream analytics 【发布时间】:2018-08-21 14:53:48 【问题描述】:我有一个来自 iotHub 的 json 消息,例如:
"deviceId": "abc",
"topic": "data",
"data":
"varname1": [
"t": "timestamp1",
"v": "value1",
"f": "respondFrame1"
,
"t": "timestamp2",
"v": "value2",
"f": "respondFrame2"
],
"varname2": [
"t": "timestamp1",
"v": "value1",
"f": "respondFrame1"
,
"t": "timestamp2",
"v": "value2",
"f": "respondFrame2"
]
并希望通过 azure 流分析作业将其存储到这样的事务 sql 中:
ID | deviceId | varname | timestamp | respondFrame | value
-----+------------+-----------+-------------+----------------+--------
1 | abc | varname1 | timestamp1 | respondFrame1 | value1
2 | abc | varname1 | timestamp2 | respondFrame2 | value2
3 | abc | varname2 | timestamp1 | respondFrame1 | value1
4 | abc | varname2 | timestamp2 | respondFrame2 | value2
有人知道如何处理这种堆叠的迭代并将其组合(交叉应用)吗?
类似“phantomCode”的东西:
deviceId = msg.deviceId
for d in msg.data:
for key in d:
varname = key.name
timestamp = key[varname].t
frame = key[varname].f
value = key[varname].v
更新关于 JS Azure 的回答:
用代码
WITH datalist AS
(
SELECT
iotHubAlias.deviceId,
data.PropertyName as varname,
data.PropertyValue as arrayData
FROM [iotHub] as iotHubAlias
CROSS APPLY GetRecordProperties(iotHubAlias.data) AS data
WHERE iotHubAlias.topic = 'data'
)
SELECT
datalist.deviceId,
datalist.varname,
arrayElement.ArrayValue.t as [timestamp],
arrayElement.ArrayValue.f as respondFrame,
arrayElement.ArrayValue.v as value
INTO [temporary]
FROM datalist
CROSS APPLY GetArrayElements(datalist.arrayData) AS arrayElement
我总是得到一个错误:
"channels": "Operation",
"correlationId": "f9d4437b-707e-4892-a37b-8ad721eb1bb2",
"description": "",
"eventDataId": "ef5a5f2b-8c2f-49c2-91f0-16213aaa959d",
"eventName":
"value": "streamingNode0",
"localizedValue": "streamingNode0"
,
"category":
"value": "Administrative",
"localizedValue": "Administrative"
,
"eventTimestamp": "2018-08-21T18:23:39.1804989Z",
"id": "/subscriptions/46cd2f8f-b46b-4428-8f7b-c7d942ff745d/resourceGroups/fieldtest/providers/Microsoft.StreamAnalytics/streamingjobs/streamAnalytics4fieldtest/events/ef5a5f2b-8c2f-49c2-91f0-16213aaa959d/ticks/636704726191804989",
"level": "Error",
"operationId": "7a38a957-1a51-4da1-a679-eae1c7e3a65b",
"operationName":
"value": "Process Events: Processing events Runtime Error",
"localizedValue": "Process Events: Processing events Runtime Error"
,
"resourceGroupName": "fieldtest",
"resourceProviderName":
"value": "Microsoft.StreamAnalytics",
"localizedValue": "Microsoft.StreamAnalytics"
,
"resourceType":
"value": "Microsoft.StreamAnalytics/streamingjobs",
"localizedValue": "Microsoft.StreamAnalytics/streamingjobs"
,
"resourceId": "/subscriptions/46cd2f8f-b46b-4428-8f7b-c7d942ff745d/resourceGroups/fieldtest/providers/Microsoft.StreamAnalytics/streamingjobs/streamAnalytics4fieldtest",
"status":
"value": "Failed",
"localizedValue": "Failed"
,
"subStatus":
"value": "",
"localizedValue": ""
,
"submissionTimestamp": "2018-08-21T18:24:34.0981187Z",
"subscriptionId": "46cd2f8f-b46b-4428-8f7b-c7d942ff745d",
"properties":
"Message Time": "2018-08-21 18:23:39Z",
"Error": "- Unable to cast object of type 'Microsoft.EventProcessing.RuntimeTypes.ValueArray' to type 'Microsoft.EventProcessing.RuntimeTypes.IRecord'.\r\n",
"Message": "Runtime exception occurred while processing events, - Unable to cast object of type 'Microsoft.EventProcessing.RuntimeTypes.ValueArray' to type 'Microsoft.EventProcessing.RuntimeTypes.IRecord'.\r\n, : OutputSourceAlias:temporary;",
"Type": "SqlRuntimeError",
"Correlation ID": "f9d4437b-707e-4892-a37b-8ad721eb1bb2"
,
"relatedEvents": []
这里是来自设备的真实 json msg 示例:
"topic": "data",
"data":
"ExternalFlowTemperatureSensor": [
"t": "2018-08-22T11:00:11.955381",
"v": 16.64103,
"f": "Q6ES8KJIN1NX2DRGH36RX1WDT"
],
"AdaStartsP2": [
"t": "2018-08-22T11:00:12.863383",
"v": 382.363138,
"f": "9IY7B4DFBAMOLH3GNKRUNUQNUX"
,
"t": "2018-08-22T11:00:54.172501",
"v": 104.0,
"f": "IUJMP20CYQK60B"
],
"s_DriftData[4].c32_ZeitLetzterTest": [
"t": "2018-08-22T11:01:01.829568",
"v": 348.2916,
"f": "MMTPWQVLL02CA"
]
,
"deviceId": "test_3c27db"
以及(完成)sql表的创建代码:
create table temporary (
id int NOT NULL IDENTITY PRIMARY KEY,
deviceId nvarchar(20) NOT NULL,
timestamp datetime NOT NULL,
varname nvarchar(100) NOT NULL,
value float,
respondFrame nvarchar(50)
)
【问题讨论】:
【参考方案1】:以下查询将为您提供预期的输出
WITH step1 AS
(
SELECT
event.deviceID,
data.PropertyName as varname,
data.PropertyValue as arrayData
FROM blobtest as event
CROSS APPLY GetRecordProperties(event.data) AS data
)
SELECT
event.deviceId,
event.varname,
arrayElement.ArrayValue.t as [timestamp],
arrayElement.ArrayValue.f as frame,
arrayElement.ArrayValue.v as value
FROM step1 as event
CROSS APPLY GetArrayElements(event.arrayData) AS arrayElement
您可以在我们的文档页面“Parse JSON and Avro data in Azure Stream Analytics”上找到有关 JSON 解析的更多信息
如果您有任何其他问题,请告诉我。
JS(Azure 流分析)
【讨论】:
感谢您的回答!但是当我喜欢你的代码时,我总是会得到一个 runtimeError。请参阅上面我的问题中的更新部分。你知道出了什么问题吗? 您好,很抱歉听到它不起作用。由于它适用于您最初问题中发布的示例,因此我认为您可能有不符合该架构的消息。您可以使用输入刀片中的“示例”功能下载一些实际输入吗?然后我们可以看看会发生什么,并过滤掉不符合您预期架构的消息。 好的,谢谢。请查看我的问题:我添加了一个由设备发送到 iotHub 的真实 msg 示例 我使用您建议的相同架构使用 SQL 数据库尝试了您的确切查询,并且它有效。我的理论是你有一些不符合模式的事件。我们可以尝试使用一些 TRY_CAST 使查询更加独立于模式。为了证实这一理论,是否可以收集更长的数据样本并将其发送给我们?然后,您可以将其发送给我们的团队:askasa@microsoft,com以上是关于通过 cql 流分析在 json msg 中迭代嵌套列表的主要内容,如果未能解决你的问题,请参考以下文章
有没有办法通过流分析作业将原始 JSON 数据传输到 Azure SQL 中?