Azure 流分析 - 您可以查询最新的非空值吗?

Posted

技术标签:

【中文标题】Azure 流分析 - 您可以查询最新的非空值吗?【英文标题】:Azure stream analytics - can you query for most recent non null values? 【发布时间】:2021-05-27 19:42:38 【问题描述】:

我有一个 Azure 流分析作业,其中包含来自 IoT 中心的输入数据,并且我正在将此数据发送到 Power BI。数据正在使用 pub/sub 更新,因此每条消息只更新一个值。因此,我的输入数据看起来像这样(MessageID 5 是最新消息):

MessageID RPM Temperature Pressure
5 800 null null
4 null 50 null
3 null null 4
2 null 23 null
1 900 null null

我想在 power bi 仪表板和实时固定到仪表板的报表上显示最新的非空值。在这个例子中,我想要一张卡片或仪表来显示每个变量的最新非空值,所以

RPM = 800,
Temperature = 50, and
Pressure = 4.

默认情况下,Power BI 仪表板会读取最新的值,因此如果该值不是刚刚更新(并且在表中为 null),则仪表板会将 (Blank) 显示为值。

是否有要从我的 Azure 流分析作业中写入的 SQL 查询来获取每个变量的最新非空值?或者其他一些解决方法?

谢谢

【问题讨论】:

流分析的一些功能可以帮助您解决该要求,主要是关于时间管理。您当前是否在查询中使用时间窗口?您能否确定 3 个传感器都以可靠方式获取值的时间窗口? 此外,您是否仅从单个 IoT 中心读取数据,其有效负载会有所不同并且可以省略传感器。还是您在查询中加入的多个 IoT 中心流? @FlorianEiden 请参阅下面链接的相关帖子,了解我正在使用的查询。我总共有三个设备向单个 IoT 中心发送消息。每个设备将数据输出到单独的 Power Bi 表。我不确定您在查询中所说的time window 是什么意思,但我希望这能回答您的问题。如果我可以提供任何进一步的信息来帮助您更好地理解问题,请告诉我。 ***.com/questions/67730501/… 【参考方案1】:

ASA 的一个好处是您可以在 time window 上对事件进行分组:

在这里,如果我理解正确,当您一次查看事件一时,您只会获得 1 个数据点。解决此问题的方法是查看时间窗口,并将所有数据点投影到单个输出事件中。

假设您应该每 10 秒收到 1 个数据点。然后,您可以编写一个查询,对该时间窗口上的所有事件进行 GROUP BY,并输出包含所有 4 个事件的结果:


WITH dataPrep AS (
SELECT
    CAST(GetMetadataPropertyValue(IoTHub, 'IoTHub.EnqueuedTime') AS datetime) AS eventDateTime,
    GetMetadataPropertyValue(IoTHub, 'IoTHub.ConnectionDeviceId')  AS DeviceId,
    *
FROM IoTHub
)

SELECT 
    System.Timestamp() AS DateTime,
    DeviceId,
    LAST(RPM) OVER (PARTITION BY DeviceId LIMIT DURATION(second, 20) WHEN RPM IS NOT NULL),
    LAST(FuelRate) OVER (PARTITION BY DeviceId LIMIT DURATION(second, 20) WHEN FuelRate IS NOT NULL),
    LAST(DischargePressure) OVER (PARTITION BY DeviceId LIMIT DURATION(second, 20) WHEN DischargePressureIS NOT NULL),  
    LAST(SystemPressure) OVER (PARTITION BY DeviceId LIMIT DURATION(second, 20) WHEN SystemPressure IS NOT NULL)   

INTO
    powerbioutput
FROM
    dataPrep TIMESTAMP BY eventDateTime
GROUP BY
    DeviceId,
    TumblingWindow(second,10)
WHERE
    DeviceId = 'rpi1'

这里我使用了LAST,语法可能看起来有点复杂但很简单。由于它需要 PARTITION BY,因此我必须明确说明,如果您将来想要扩展,这无论如何都是一个好习惯。

根据您的要求,您可以使用 MAX、MIN 或 AVG 使其更简单。

你可以看here for other ASA query patterns。

【讨论】:

【参考方案2】:

在这种情况下,您可以使用 Coalesce。它按顺序计算参数并返回最初不计算为 NULL 的第一个表达式的值。

参考下面的链接-

https://docs.microsoft.com/en-us/stream-analytics-query/coalesce-azure-stream-analytics

【讨论】:

嗨 Neeraj,如果所有值都出现在单个事件或事件窗口中,这将起作用。这里我的猜测是每个事件只有 1 个数据点,而不是全部 3 个。

以上是关于Azure 流分析 - 您可以查询最新的非空值吗?的主要内容,如果未能解决你的问题,请参考以下文章

在除前两列之外的每列上前向填充具有最新非空值的空值

计算Spark DataFrame中的非空值的数量

Kotlin 中的非空值产生空指针异常

返回 MIN 和 MAX 值并忽略空值 - 使用前面的非空值填充空值

在 Azure 流分析中访问数组元素

Azure 流分析查询 - 进行合并