通过 cql 流分析在 json msg 中迭代嵌套列表

Posted

技术标签:

【中文标题】通过 cql 流分析在 json msg 中迭代嵌套列表【英文标题】:iterate nested list in json msg by cql stream analytics 【发布时间】:2018-08-21 14:53:48 【问题描述】:

我有一个来自 iotHub 的 json 消息,例如:


    "deviceId": "abc",
    "topic": "data",
    "data": 
        "varname1": [
            "t": "timestamp1",
            "v": "value1",
            "f": "respondFrame1"
        ,
        
            "t": "timestamp2",
            "v": "value2",
            "f": "respondFrame2"
        ],
        "varname2": [
            "t": "timestamp1",
            "v": "value1",
            "f": "respondFrame1"
        ,
        
            "t": "timestamp2",
            "v": "value2",
            "f": "respondFrame2"
        ]
    

并希望通过 azure 流分析作业将其存储到这样的事务 sql 中:

ID   |   deviceId |  varname  |  timestamp  |  respondFrame  | value
-----+------------+-----------+-------------+----------------+--------
1    |   abc      |  varname1 |  timestamp1 |  respondFrame1 | value1
2    |   abc      |  varname1 |  timestamp2 |  respondFrame2 | value2
3    |   abc      |  varname2 |  timestamp1 |  respondFrame1 | value1
4    |   abc      |  varname2 |  timestamp2 |  respondFrame2 | value2

有人知道如何处理这种堆叠的迭代并将其组合(交叉应用)吗?

类似“phantomCode”的东西:

deviceId = msg.deviceId
for d in msg.data:
    for key in d:
        varname = key.name
        timestamp = key[varname].t
        frame = key[varname].f
        value = key[varname].v

更新关于 JS Azure 的回答:

用代码

WITH datalist AS
(
    SELECT   
        iotHubAlias.deviceId,  
        data.PropertyName as varname,  
        data.PropertyValue as arrayData 
    FROM [iotHub] as iotHubAlias  
    CROSS APPLY GetRecordProperties(iotHubAlias.data) AS data
    WHERE iotHubAlias.topic = 'data'
)
SELECT
    datalist.deviceId,
    datalist.varname,
    arrayElement.ArrayValue.t as [timestamp],
    arrayElement.ArrayValue.f as respondFrame,
    arrayElement.ArrayValue.v as value
INTO [temporary]
FROM datalist 
CROSS APPLY GetArrayElements(datalist.arrayData) AS arrayElement

我总是得到一个错误:


    "channels": "Operation",
    "correlationId": "f9d4437b-707e-4892-a37b-8ad721eb1bb2",
    "description": "",
    "eventDataId": "ef5a5f2b-8c2f-49c2-91f0-16213aaa959d",
    "eventName": 
        "value": "streamingNode0",
        "localizedValue": "streamingNode0"
    ,
    "category": 
        "value": "Administrative",
        "localizedValue": "Administrative"
    ,
    "eventTimestamp": "2018-08-21T18:23:39.1804989Z",
    "id": "/subscriptions/46cd2f8f-b46b-4428-8f7b-c7d942ff745d/resourceGroups/fieldtest/providers/Microsoft.StreamAnalytics/streamingjobs/streamAnalytics4fieldtest/events/ef5a5f2b-8c2f-49c2-91f0-16213aaa959d/ticks/636704726191804989",
    "level": "Error",
    "operationId": "7a38a957-1a51-4da1-a679-eae1c7e3a65b",
    "operationName": 
        "value": "Process Events: Processing events Runtime Error",
        "localizedValue": "Process Events: Processing events Runtime Error"
    ,
    "resourceGroupName": "fieldtest",
    "resourceProviderName": 
        "value": "Microsoft.StreamAnalytics",
        "localizedValue": "Microsoft.StreamAnalytics"
    ,
    "resourceType": 
        "value": "Microsoft.StreamAnalytics/streamingjobs",
        "localizedValue": "Microsoft.StreamAnalytics/streamingjobs"
    ,
    "resourceId": "/subscriptions/46cd2f8f-b46b-4428-8f7b-c7d942ff745d/resourceGroups/fieldtest/providers/Microsoft.StreamAnalytics/streamingjobs/streamAnalytics4fieldtest",
    "status": 
        "value": "Failed",
        "localizedValue": "Failed"
    ,
    "subStatus": 
        "value": "",
        "localizedValue": ""
    ,
    "submissionTimestamp": "2018-08-21T18:24:34.0981187Z",
    "subscriptionId": "46cd2f8f-b46b-4428-8f7b-c7d942ff745d",
    "properties": 
        "Message Time": "2018-08-21 18:23:39Z",
        "Error": "- Unable to cast object of type 'Microsoft.EventProcessing.RuntimeTypes.ValueArray' to type 'Microsoft.EventProcessing.RuntimeTypes.IRecord'.\r\n",
        "Message": "Runtime exception occurred while processing events, - Unable to cast object of type 'Microsoft.EventProcessing.RuntimeTypes.ValueArray' to type 'Microsoft.EventProcessing.RuntimeTypes.IRecord'.\r\n, : OutputSourceAlias:temporary;",
        "Type": "SqlRuntimeError",
        "Correlation ID": "f9d4437b-707e-4892-a37b-8ad721eb1bb2"
    ,
    "relatedEvents": []

这里是来自设备的真实 json msg 示例:


    "topic": "data",
    "data": 
        "ExternalFlowTemperatureSensor": [
            "t": "2018-08-22T11:00:11.955381",
            "v": 16.64103,
            "f": "Q6ES8KJIN1NX2DRGH36RX1WDT"
        ],
        "AdaStartsP2": [
            "t": "2018-08-22T11:00:12.863383",
            "v": 382.363138,
            "f": "9IY7B4DFBAMOLH3GNKRUNUQNUX"
        ,
        
            "t": "2018-08-22T11:00:54.172501",
            "v": 104.0,
            "f": "IUJMP20CYQK60B"
        ],
        "s_DriftData[4].c32_ZeitLetzterTest": [
            "t": "2018-08-22T11:01:01.829568",
            "v": 348.2916,
            "f": "MMTPWQVLL02CA"
        ]
    ,
    "deviceId": "test_3c27db"

以及(完成)sql表的创建代码:

create table temporary (
    id int NOT NULL IDENTITY PRIMARY KEY,
    deviceId nvarchar(20) NOT NULL,
    timestamp datetime NOT NULL,
    varname nvarchar(100) NOT NULL,
    value float,
    respondFrame nvarchar(50)
    )

【问题讨论】:

【参考方案1】:

以下查询将为您提供预期的输出

WITH step1 AS
(
SELECT   
    event.deviceID,  
    data.PropertyName as varname,  
    data.PropertyValue as arrayData 
FROM blobtest as event  
CROSS APPLY GetRecordProperties(event.data) AS data  
)
SELECT
    event.deviceId,
    event.varname,
    arrayElement.ArrayValue.t  as [timestamp],
    arrayElement.ArrayValue.f as frame,
    arrayElement.ArrayValue.v as value
FROM step1 as event  
CROSS APPLY GetArrayElements(event.arrayData) AS arrayElement  

您可以在我们的文档页面“Parse JSON and Avro data in Azure Stream Analytics”上找到有关 JSON 解析的更多信息

如果您有任何其他问题,请告诉我。

JS(Azure 流分析)

【讨论】:

感谢您的回答!但是当我喜欢你的代码时,我总是会得到一个 runtimeError。请参阅上面我的问题中的更新部分。你知道出了什么问题吗? 您好,很抱歉听到它不起作用。由于它适用于您最初问题中发布的示例,因此我认为您可能有不符合该架构的消息。您可以使用输入刀片中的“示例”功能下载一些实际输入吗?然后我们可以看看会发生什么,并过滤掉不符合您预期架构的消息。 好的,谢谢。请查看我的问题:我添加了一个由设备发送到 iotHub 的真实 msg 示例 我使用您建议的相同架构使用 SQL 数据库尝试了您的确切查询,并且它有效。我的理论是你有一些不符合模式的事件。我们可以尝试使用一些 TRY_CAST 使查询更加独立于模式。为了证实这一理论,是否可以收集更长的数据样本并将其发送给我们?然后,您可以将其发送给我们的团队:askasa@microsoft,com

以上是关于通过 cql 流分析在 json msg 中迭代嵌套列表的主要内容,如果未能解决你的问题,请参考以下文章

有没有办法通过流分析作业将原始 JSON 数据传输到 Azure SQL 中?

在 Django 模板中迭代 JSON

Azure 流分析作业在输入格式错误 (JSON) 后被阻止

迭代解析JSON简单实例

执行简单的CQL操作

在 Azure 流分析中的属性内提取数组