Azure 流分析过滤多个输入列以输出到 SQL 表,卡在 5 个接收器的限制
Posted
技术标签:
【中文标题】Azure 流分析过滤多个输入列以输出到 SQL 表,卡在 5 个接收器的限制【英文标题】:Azure Stream Analytics filtering multiple input colums to output to SQL tables, getting stuck at limit of 5 receivers 【发布时间】:2021-02-27 09:16:07 【问题描述】:我正在尝试使用 Azure 流分析对来自 IoT 中心的数据进行排序,来自 IoT 中心的传入数据位于多个列中,我的目标是将它们排序到 SQL 表中。
到目前为止,我必须从五列中选择数据,而不是在我的流输入上抛出和警告(超出最大事件中心接收器)并且没有数据得到处理,这是查询代码:
WITH SD_input AS
(
SELECT *
FROM [SDEwoninput]
),
cycleTimes AS
(
SELECT
"1_Cycle_time_10" AS cycle_time,
time AS cycle_timestamp,
1 AS cycle_name_id -- 10 Zone Cycle
FROM
SD_input
WHERE
LAG("1_Cycle_time_10", 1) OVER (LIMIT DURATION(minute, 1)) <> "1_Cycle_time_10"
UNION
SELECT
"1_Cycle_time_20" AS cycle_time,
time AS cycle_timestamp,
2 AS cycle_name_id -- 20 Zone Cycle
FROM
SD_input
WHERE
LAG("1_Cycle_time_20", 1) OVER (LIMIT DURATION(minute, 1)) <> "1_Cycle_time_20"
UNION
SELECT
"1_Cycle_time_30" AS cycle_time,
time AS cycle_timestamp,
3 AS cycle_name_id -- 30 Zone Cycle
FROM
SD_input
WHERE
LAG("1_Cycle_time_30", 1) OVER (LIMIT DURATION(minute, 1)) <> "1_Cycle_time_30"
UNION
SELECT
"1_Cycle_time_40" AS cycle_time,
time AS cycle_timestamp,
4 AS cycle_name_id -- 40 Zone Cycle
FROM
SD_input
WHERE
LAG("1_Cycle_time_40", 1) OVER (LIMIT DURATION(minute, 1)) <> "1_Cycle_time_40"
UNION
SELECT
"1_Cycle_time_50" AS cycle_time,
time AS cycle_timestamp,
5 AS cycle_name_id -- 50 Zone Cycle
FROM
SD_input
WHERE
LAG("1_Cycle_time_50", 1) OVER (LIMIT DURATION(minute, 1)) <> "1_Cycle_time_50"
UNION
SELECT
"1_Cycle_time_60" AS cycle_time,
time AS cycle_timestamp,
6 AS cycle_name_id -- 60 Zone cycle
FROM
SD_input
WHERE
LAG("1_Cycle_time_60", 1) OVER (LIMIT DURATION(minute, 1)) <> "1_Cycle_time_60"
UNION
SELECT
"1_Cycle_time_70" AS cycle_time,
time AS cycle_timestamp,
7 AS cycle_name_id -- 70 Zone Cycle
FROM
SD_input
WHERE
LAG("1_Cycle_time_70", 1) OVER (LIMIT DURATION(minute, 1)) <> "1_Cycle_time_70"
)
SELECT *
INTO [sd-cycle-times]
FROM cycleTimes
这是供参考的输入:
"1_Cycle_time_10": "10.95",
"1_Cycle_time_20": "10.67",
"1_Cycle_time_30": "11.57",
"1_Cycle_time_40": "12.02",
"1_Cycle_time_50": "7.98",
"1_Cycle_time_60": "7.83",
"1_Cycle_time_70": "8.46",
"1_Tot_lid_count": "600680",
"1_vak_pump_h": "1278",
"1_Run_time_H": "0",
"1_Run_time_M": "2",
"1_Run_time_S": "35",
"1_P_Run_time_S": "12",
"1_P_Run_time_M": "48",
"1_P_Run_time_H": "0",
"1_Fault40_1": "333",
"1_Fault40_2": "167",
"1_Fault40_3": "65",
"1_Fault40_4": "16",
"1_FaultSum40": "581",
"1_Fault50_1": "140",
"1_Fault50_2": "0",
"1_Fault50_3": "5",
"1_Fault50_4": "3",
"1_FaultSum50": "148",
"1_Fault60_1": "8",
"1_Fault60_2": "1",
"1_Fault60_3": "3",
"1_Fault60_4": "0",
"1_FaultSum60": "12",
"1_Fault70_1": "4767",
"1_Fault70_2": "4417",
"1_Fault70_3": "4132",
"1_Fault70_4": "5548",
"1_FaultSum70": "18864",
"1_P_kastes_time_S": "29",
"1_P_kastes_time_M": "6",
"1_P_kastes_time_H": "0",
"1_AUTO": "1",
"time": "2021-02-27 09:59:42",
"EventProcessedUtcTime": "2021-02-27T08:57:58.0443121Z",
"PartitionId": 1,
"EventEnqueuedUtcTime": "2021-02-27T07:59:37.8820000Z",
"IoTHub":
"MessageId": null,
"CorrelationId": null,
"ConnectionDeviceId": "SD_EWON",
"ConnectionDeviceGenerationId": "637455362146855668",
"EnqueuedTime": "2021-02-27T07:59:37.8710000Z",
"StreamId": null
解决这个问题的方法是什么,UDF 函数?
【问题讨论】:
【参考方案1】:使用事件中心的最佳做法是使用多个使用者组来实现作业可伸缩性。要解决此错误,请在 IoT 中心创建一个新的使用者组。然后在您的流分析作业中编辑输入以使用新的消费者组。
查看这些文档:
-
https://docs.microsoft.com/en-us/azure/stream-analytics/stream-analytics-troubleshoot-input
https://docs.microsoft.com/en-us/rest/api/iothub/iothubresource/createeventhubconsumergroup
【讨论】:
以上是关于Azure 流分析过滤多个输入列以输出到 SQL 表,卡在 5 个接收器的限制的主要内容,如果未能解决你的问题,请参考以下文章
将流分析作业中的输出数据流式传输到 Azure Synapse Analytics sql 池表?
如何使用 Azure 流分析将本地 SQL Server 用作输出?
Azure 流分析作业在输入格式错误 (JSON) 后被阻止