Azure 流分析过滤多个输入列以输出到 SQL 表,卡在 5 个接收器的限制

Posted

技术标签:

【中文标题】Azure 流分析过滤多个输入列以输出到 SQL 表,卡在 5 个接收器的限制【英文标题】:Azure Stream Analytics filtering multiple input colums to output to SQL tables, getting stuck at limit of 5 receivers 【发布时间】:2021-02-27 09:16:07 【问题描述】:

我正在尝试使用 Azure 流分析对来自 IoT 中心的数据进行排序,来自 IoT 中心的传入数据位于多个列中,我的目标是将它们排序到 SQL 表中。

到目前为止,我必须从五列中选择数据,而不是在我的流输入上抛出和警告(超出最大事件中心接收器)并且没有数据得到处理,这是查询代码:

WITH SD_input AS 
(
    SELECT *
    FROM [SDEwoninput]
),
cycleTimes AS 
(
    SELECT
        "1_Cycle_time_10" AS cycle_time,
        time AS cycle_timestamp,
        1 AS cycle_name_id -- 10 Zone Cycle
    FROM
        SD_input
    WHERE
        LAG("1_Cycle_time_10", 1) OVER (LIMIT DURATION(minute, 1)) <> "1_Cycle_time_10"
    UNION
    SELECT
        "1_Cycle_time_20" AS cycle_time,
        time AS cycle_timestamp,
        2 AS cycle_name_id -- 20 Zone Cycle
    FROM
        SD_input
    WHERE
        LAG("1_Cycle_time_20", 1) OVER (LIMIT DURATION(minute, 1)) <> "1_Cycle_time_20"
    UNION
    SELECT
        "1_Cycle_time_30" AS cycle_time,
        time AS cycle_timestamp,
        3 AS cycle_name_id -- 30 Zone Cycle
    FROM
        SD_input
    WHERE
        LAG("1_Cycle_time_30", 1) OVER (LIMIT DURATION(minute, 1)) <> "1_Cycle_time_30"
    UNION
    SELECT
        "1_Cycle_time_40" AS cycle_time,
        time AS cycle_timestamp,
        4 AS cycle_name_id -- 40 Zone Cycle
    FROM
        SD_input
    WHERE
        LAG("1_Cycle_time_40", 1) OVER (LIMIT DURATION(minute, 1)) <> "1_Cycle_time_40"
    UNION
    SELECT
        "1_Cycle_time_50" AS cycle_time,
        time AS cycle_timestamp,
        5 AS cycle_name_id -- 50 Zone Cycle
    FROM
        SD_input
    WHERE
        LAG("1_Cycle_time_50", 1) OVER (LIMIT DURATION(minute, 1)) <> "1_Cycle_time_50"
    UNION
    SELECT
        "1_Cycle_time_60" AS cycle_time,
        time AS cycle_timestamp,
        6 AS cycle_name_id -- 60 Zone cycle
    FROM
        SD_input
    WHERE
        LAG("1_Cycle_time_60", 1) OVER (LIMIT DURATION(minute, 1)) <> "1_Cycle_time_60"
        UNION
    SELECT
        "1_Cycle_time_70" AS cycle_time,
        time AS cycle_timestamp,
        7 AS cycle_name_id -- 70 Zone Cycle
    FROM
        SD_input
    WHERE
        LAG("1_Cycle_time_70", 1) OVER (LIMIT DURATION(minute, 1)) <> "1_Cycle_time_70"
)
SELECT *
INTO [sd-cycle-times]
FROM cycleTimes

这是供参考的输入:

 
    "1_Cycle_time_10": "10.95",
    "1_Cycle_time_20": "10.67",
    "1_Cycle_time_30": "11.57",
    "1_Cycle_time_40": "12.02",
    "1_Cycle_time_50": "7.98",
    "1_Cycle_time_60": "7.83",
    "1_Cycle_time_70": "8.46",
    "1_Tot_lid_count": "600680",
    "1_vak_pump_h": "1278",
    "1_Run_time_H": "0",
    "1_Run_time_M": "2",
    "1_Run_time_S": "35",
    "1_P_Run_time_S": "12",
    "1_P_Run_time_M": "48",
    "1_P_Run_time_H": "0",
    "1_Fault40_1": "333",
    "1_Fault40_2": "167",
    "1_Fault40_3": "65",
    "1_Fault40_4": "16",
    "1_FaultSum40": "581",
    "1_Fault50_1": "140",
    "1_Fault50_2": "0",
    "1_Fault50_3": "5",
    "1_Fault50_4": "3",
    "1_FaultSum50": "148",
    "1_Fault60_1": "8",
    "1_Fault60_2": "1",
    "1_Fault60_3": "3",
    "1_Fault60_4": "0",
    "1_FaultSum60": "12",
    "1_Fault70_1": "4767",
    "1_Fault70_2": "4417",
    "1_Fault70_3": "4132",
    "1_Fault70_4": "5548",
    "1_FaultSum70": "18864",
    "1_P_kastes_time_S": "29",
    "1_P_kastes_time_M": "6",
    "1_P_kastes_time_H": "0",
    "1_AUTO": "1",
    "time": "2021-02-27 09:59:42",
    "EventProcessedUtcTime": "2021-02-27T08:57:58.0443121Z",
    "PartitionId": 1,
    "EventEnqueuedUtcTime": "2021-02-27T07:59:37.8820000Z",
    "IoTHub": 
      "MessageId": null,
      "CorrelationId": null,
      "ConnectionDeviceId": "SD_EWON",
      "ConnectionDeviceGenerationId": "637455362146855668",
      "EnqueuedTime": "2021-02-27T07:59:37.8710000Z",
      "StreamId": null
    

解决这个问题的方法是什么,UDF 函数?

【问题讨论】:

【参考方案1】:

使用事件中心的最佳做法是使用多个使用者组来实现作业可伸缩性。要解决此错误,请在 IoT 中心创建一个新的使用者组。然后在您的流分析作业中编辑输入以使用新的消费者组。

查看这些文档:

    https://docs.microsoft.com/en-us/azure/stream-analytics/stream-analytics-troubleshoot-input https://docs.microsoft.com/en-us/rest/api/iothub/iothubresource/createeventhubconsumergroup

【讨论】:

以上是关于Azure 流分析过滤多个输入列以输出到 SQL 表,卡在 5 个接收器的限制的主要内容,如果未能解决你的问题,请参考以下文章

Azure 流分析:SQL 输出不起作用

将流分析作业中的输出数据流式传输到 Azure Synapse Analytics sql 池表?

如何使用 Azure 流分析将本地 SQL Server 用作输出?

Azure 流分析作业在输入格式错误 (JSON) 后被阻止

使用流分析作业查询从 EventHub 中过滤 Azure 事件

Azure 流分析:多个 Windows + JOINS