Azure 流分析会话窗口异常行为

Posted

技术标签:

【中文标题】Azure 流分析会话窗口异常行为【英文标题】:Azure Stream Analytics Session window abnormal behavior 【发布时间】:2019-03-12 09:28:25 【问题描述】:

在我的 Azure 流分析作业中,我想为每个设备创建一个窗口(当设备收到数据时)。我正在使用会话窗口为每个设备维护一个单独的窗口。我希望每 5 分钟发出一次温度阈值违规警报,但有时行为会有所不同。以下是我的 ASA JOB 查询

SELECT
t1.DeviceId,MIN(t1.Temperature)MinTemperature,     
MAX(t1.Temperature)MaxTemperature,t2.ThresholdLimit,System.Timestamp as 
EventProcessedUtcTime,t3.EventProcessedUtcTime as LastAlertTime
,MIN(CAST(t1.MessageTime AS DateTime)) AS Window_start,
System.Timestamp AS Window_end,Count(t1.DeviceId)EventCount
INTO
[alertOutputsb]
FROM
[tsfInput] t1
INNER JOIN [device-threashold-Input] t2
ON t1.DeviceId = t2.DeviceId
LEFT OUTER JOIN [temperature-alerts-sql-Input] t3
ON t1.DeviceId = t3.DeviceId   
WHERE t1.Temperature >= Cast(t2.ThresholdLimit as float)      
GROUP BY t1.DeviceId,t2.ThresholdLimit,t3.EventProcessedUtcTime, 
SESSIONWINDOW(minute, 4, 5) OVER (PARTITION BY t1.DeviceId)
HAVING MIN(t1.Temperature) >= Cast(t2.ThresholdLimit as float)
AND MAX(t1.Temperature) >= Cast(t2.ThresholdLimit as float)

输入JSON 我每隔 1 分钟发送一次设备数据。


"DeviceId": "9327848923ABE",
"Temperature": 300.5,
"MessageTime": "2019-03-02T05:50:05.362Z"

输出

对于设备 - 9327848923ABE,我在 10 分钟内收到第一个警报,然后在 5 分钟窗口内收到所有警报。

【问题讨论】:

我认为这是 ASA 会话窗口中的错误。我开始为 3 台设备(间隔 30 秒)和 1 台设备停止 1 分钟后发送新作业发送事件。仍然,我每 5 分钟收到 3 个警报,并且所有结束时间都相同 【参考方案1】:

Azure Stream Analytics 是管理会话窗口的方式:

会话窗口组在相似时间到达的事件,过滤 在没有数据的时间段内。会话窗口功能 具有三个主要参数:超时、最大持续时间和分区 键(可选)。

下图说明了一个包含一系列事件的流和 它们如何映射到 5 分钟超时的会话窗口,以及 最长持续时间为 10 分钟。

当第一个事件发生时,会话窗口开始。如果另一个事件 在最后一个摄取事件的指定超时内发生,然后 窗口扩展以包含新事件。否则如果没有事件 在超时时间内发生,则窗口在超时时关闭。

如果事件在指定的超时时间内持续发生,则会话 窗口将继续延长,直到达到最大持续时间。请 请注意,最大持续时间检查间隔设置为 与指定的最大持续时间相同。例如,如果最大 持续时间为 10,然后检查窗口是否超过最大值 持续时间将发生在 t = 0、10、20、30 等。

因此,在数学上,我们的会话窗口在以下情况下结束 条件满足:

流分析会话窗口 5 分钟超时和最多 10 分钟

当提供分区键时,事件按以下方式分组在一起 密钥和会话窗口独立应用于每个组。 这对于需要不同会话窗口的情况很有用 不同的用户或设备。

这是语法

SESSIONWINDOW(timeunit, timeoutSize, maxDurationSize) [OVER (PARTITION BY partitionKey)]

SESSIONWINDOW(Timeout(timeunit , timeoutSize), MaxDuration(timeunit, maxDurationSize)) [OVER (PARTITION BY partitionKey)]

解释

超时

描述会话窗口间隙大小的大整数。数据 在间隙大小内发生的事件被归为同一个 窗口。

最大持续时间

如果总窗口大小超过指定的 maxDurationSize 检查点,然后关闭窗口并打开一个新窗口 在同一点。目前,检查间隔的大小是 等于 maxDurationSize。

分区键

一个可选参数,指定会话窗口的键 运行结束。如果指定,窗口只会组合在一起 同一键的事件。

示例JSON:

[
  // time: the timestamp when the user clicks on the link
  // user_id: the id of the user
  // url: the url the user clicked on
  
    "time": "2017-01-26T00:00:00.0000000z",
    "user_id": 0,
    "url": "www.example.com/a.html"
  ,
  
    "time": "2017-01-26T00:00:20.0000000z",
    "user_id": 0,
    "url": "www.example.com/b.html"
  ,
  
    "time": "2017-01-26T00:00:55.0000000z",
    "user_id": 1,
    "url": "www.example.com/c.html"
  ,
  // ...
]

要衡量每个用户会话的时长,您可以使用以下查询:

CREATE TABLE localinput(time DATETIME, user_id BIGINT, url NVARCHAR(MAX))
SELECT
    user_id,
    MIN(time) AS window_start,
    System.Timestamp AS window_end,
    DATEDIFF(s, MIN(time), System.Timestamp) AS duration_in_seconds
FROM localinput TIMESTAMP BY time
GROUP BY user_id, SessionWindow(minute, 2, 60) OVER (PARTITION BY user_id)

前面的查询创建了一个超时时间为 2 的会话窗口 分钟,最长持续时间为 60 分钟,分区键为 用户身份。这意味着将创建独立的会话窗口 每个用户 ID。对于每个窗口,此查询将生成输出 包含 user_id、窗口的开始时间 (window_start)、 窗口结束(window_end)和用户的总持续时间 会话(duration_in_seconds)。

这很简单,如果你卡住了,就从头开始。

【讨论】:

我已经浏览了这个文档。问题是,它没有按预期工作。

以上是关于Azure 流分析会话窗口异常行为的主要内容,如果未能解决你的问题,请参考以下文章

Azure Functions ServiceBus 触发器缩放行为

Azure 流分析中的跳跃窗口

如何用SQL分析电商用户行为数据(案例)

Azure 流分析:从滑动窗口计算斜率

带有自定义窗口的 Azure 流分析

Azure 流分析时间窗口查询