Azure 流分析 - 沿流查询
Posted
技术标签:
【中文标题】Azure 流分析 - 沿流查询【英文标题】:Azure Stream Analytics - Query along the stream 【发布时间】:2017-07-26 18:31:29 【问题描述】:我正在使用 Azure 流分析。我有数据进入事件中心。进来的数据如下:
[
"id": "a2b8bcd8-ff79-4bb7-a86f-615556104c1f",
"mechanism": "geo",
"datetimereporting": "2017-07-23 11:08:00",
"geometry":
"type": "Point",
"coordinates": [-85.78378, 38.68679]
,
"id": "a2b8bcd8-ff79-4bb7-a86f-615556104c1f",
"mechanism": "geo",
"datetimereporting": "2017-07-23 11:09:00",
"geometry":
"type": "Point",
"coordinates": [-85.79378, 38.68679]
,
"id": "a2b8bcd8-ff79-4bb7-a86f-615556104c1f",
"mechanism": "geo",
"datetimereporting": "2017-07-24 14:08:00",
"geometry":
"type": "Point",
"coordinates": [-85.78378, 38.68679]
,
"id": "a2b8bcd8-ff79-4bb7-a86f-615556104c1f",
"mechanism": "geo",
"datetimereporting": "2017-07-24 14:09:00",
"geometry":
"type": "Point",
"coordinates": [-85.79378, 38.68679]
,
"id": "a2b8bcd8-ff79-4bb7-a86f-615556104c1f",
"mechanism": "geo",
"datetimereporting": "2017-07-24 14:10:00",
"geometry":
"type": "Point",
"coordinates": [-85.80378, 38.68679]
,
"id": "8bb76874-5b91-400d-b0cb-04c8e6c48d26",
"mechanism": "geo",
"datetimereporting": "2017-07-24 14:09:00",
"geometry":
"type": "Point",
"coordinates": [-115.17281, 36.11464]
,
"id": "31453016-067f-4664-ade9-244a1d7b769c",
"mechanism": "geo",
"datetimereporting": "2017-07-24 14:10:00",
"geometry":
"type": "Point",
"coordinates": [-85.76477, 38.32873]
]
流分析任务是查看数据并确定传入的坐标是否位于特定多边形中。我已经让 ST_WITHIN 查询工作了。我有一个包含我想要的所有多边形的参考 blob。麻烦就在于此。我需要检测坐标何时在多边形中以及它在多边形中的时间。
数据大约每分钟传输一次。我每分钟都会得到一个新坐标。我知道如何检测它最初何时位于多边形中。我的斗争是我怎么知道它在多边形中已经存在了多长时间?我尝试过 LAST、LAG、ISFIRST,但无济于事。目标如下:
-
数据进来
你在多边形中吗?
是吗?你在多边形里多久了?
我在这里知道我需要了解它何时首次出现在多边形中。但是,正如您从上面的数据中看到的那样,数据可能在 24 小时前就在多边形中,现在它又在那里了。我只是不知道如何构造一个查询来找出我何时在多边形中以及多长时间。有人可以帮忙吗?
【问题讨论】:
【参考方案1】:我只是不知道如何构建查询以了解我何时在多边形中以及停留了多长时间。有人可以帮忙吗?
您需要的是从一组分散的数据中获得线性结果。我建议您查询该人是否在最近期间按日期时间顺序排列的多边形中。数据可能如下。
| id | Is in polygon | date time |
|-----------------------------|---------------|-------------------- |
| a2b8bcd8-ff79-4bb7-a86f-xxx | true | 2017-07-23 11:08:00 |
| a2b8bcd8-ff79-4bb7-a86f-xxx | true | 2017-07-23 11:07:00 |
| a2b8bcd8-ff79-4bb7-a86f-xxx | false | 2017-07-23 11:06:00 |
获取数据后,您可以将其存储在任何地方(Azure Redis 或 Blob 存储)。最后,您可以使用 Azure Function 处理临时数据。以下示例代码供您参考。
var totalMinutes = 0;
foreach (var data in collection)
if (data.IsInPolygon)
totalMinutes++;
else
break;
关于如何将数据输出到 Azure Function,以下链接供您参考。
How to store data from Azure Stream Analytics in an Azure Redis Cache using Azure Functions
【讨论】:
感谢阿莫尔的回复。我明白你在说什么,但你是否建议将所有进入流分析作业的数据输出到某个存储机制中,然后使用 Azure 函数对其进行操作?因此数据进入事件中心,流分析作业启动,将数据输出到存储,Azure 函数作用于数据。这不就是将所有数据从事件中心直接传递到存储中吗? 没关系,阿莫尔,我明白你在说什么。我想我已经完成了一个完整的架构。一旦一切正常,我会尝试发布我的结果。【参考方案2】:您还可以将所有逻辑放入流分析中以实时获取。 我会使用一些类似于我们的“典型查询模式”here 中描述的查询。您的案例类似于“检测条件持续时间”示例。 下面对示例进行一些快速修改以计算多边形的总持续时间:
WITH SelectPreviousEvent AS
(
SELECT
*,
LAG([time]) OVER (LIMIT DURATION(hour, 24)) as previousTime,
LAG(ST_WITHIN(mypoint,mypolygon)) OVER (LIMIT DURATION(hour, 24)) as previousST_WITHIN
FROM input TIMESTAMP BY [time]
)
SELECT
LAG(time) OVER (LIMIT DURATION(hour, 24) WHEN ST_WITHIN(mypoint,mypolygon) = 0 ) [StartFault],
previousTime [EndFault]
FROM SelectPreviousEvent
WHERE
ST_WITHIN(mypoint,mypolygon) = 0
AND previousST_WITHIN = 1
您需要对此查询进行一些自定义。告诉我进展如何。
谢谢,
JS
【讨论】:
这很好,但我实际上需要实时分析。当每个点进来并且是 ST_WITHIN = 1 我需要知道它在那个事件中持续了多长时间。我已经尝试了一整天。我不认为这是可能的。例如。时间线将是 1a, 1b, 1c, 0, 0, 0, 1a, 1b, 0。那里有两个事件,但需要知道与 1b-1a 的时间差,然后是 1c-1a,因为 1 的到来。任何方式做那个?以上是关于Azure 流分析 - 沿流查询的主要内容,如果未能解决你的问题,请参考以下文章