Azure 流分析 - 应用窗口翻转时自定义“时间戳”出错
Posted
技术标签:
【中文标题】Azure 流分析 - 应用窗口翻转时自定义“时间戳”出错【英文标题】:Azure Stream Analytics - Error with customized "timestamp by" while applying window tumbling 【发布时间】:2017-11-06 16:03:52 【问题描述】:我有一个json文件如下:
"imei": "imei": "358174069248418F", "imeiBinary": "NYF0BpJIQY8=","imeiNotEncoded": "358174069248418","valid": 1,"dataPackets": [["msy. mxp.datapacket.AlarmNotification","version": 1, "id": 21, "op": 2,"sizeDynamic": 0, "alarmStatus": 4],["msy.mxp.datapacket.iostatus" ,"version": 1,"id": 15, "op": 2,"sizeDynamic": 0,"ioStatus": 135,"ioDirections": 120], ["msy.mxp.datapacket.LogicalStatus" ,"version": 1,"id": 16, "op": 2,"sizeDynamic": 0,"logicalStatus": 5 ],[ "msy.mxp.datapacket.Position", "version": 1,"id": 19,"op": 2,"latitude": 40.835243,"longitude": 14.246057,"altitude": 40,"speed": 0, "course": 68, "gpsNumSatellite": 5, “glonassNumSatellite”:1,“fixValid”:1,“timeValid”:1,“wgs84degMinFormat”:1,“glonass”:1,“fixMode”:3,“timestamp”:“timeSecFrom1Gen2000”:925560202,“time” : 1490648755000 , "sizeDynamic": 0 ] ]
我正在阅读以下查询:
WITH Datapackets AS
(
SELECT imei.imei as imei,
persistent as persistent,
[timestamp].[time] as input_time,
compressed as compressed,
GetArrayElement(dataPackets, 3) as position
FROM h24
), one as(
SELECT *,
GetRecordPropertyValue (GetArrayElement(position,1), 'timestamp') as position_timestamp --1st
from Datapackets
), two as (
select
imei,
GetRecordPropertyValue (GetArrayElement(position,1), 'op') as position_OP,
[position_timestamp].[time] as position_time,
dateadd(S, [position_timestamp].[timeSecFrom1Gen2000], '1970-01-01') as timing,
GetRecordPropertyValue (GetArrayElement(position,1), 'latitude') as position_latitude,
GetRecordPropertyValue (GetArrayElement(position,1), 'longitude') as position_longitude,
GetRecordPropertyValue (GetArrayElement(position,1), 'altitude') as position_altitude,
GetRecordPropertyValue (GetArrayElement(position,1), 'speed') as position_speed
from one) SELECT * from two
现在我想让窗口翻滚组如下 30 秒,但我有一个问题告诉我输入文件“两个”不允许时间戳属性,这里是我使用的查询
WITH Datapackets AS
(
SELECT imei.imei as imei,
persistent as persistent,
[timestamp].[time] as input_time,
compressed as compressed,
GetArrayElement(dataPackets, 3) as position
FROM h24
), one as(
SELECT *,
GetRecordPropertyValue (GetArrayElement(position,1), 'timestamp') as position_timestamp --1st
from Datapackets
), two as (
select
imei,
GetRecordPropertyValue (GetArrayElement(position,1), 'op') as position_OP,
[position_timestamp].[time] as position_time,
dateadd(S, [position_timestamp].[timeSecFrom1Gen2000], '1970-01-01') as timing,
GetRecordPropertyValue (GetArrayElement(position,1), 'latitude') as position_latitude,
GetRecordPropertyValue (GetArrayElement(position,1), 'longitude') as position_longitude,
GetRecordPropertyValue (GetArrayElement(position,1), 'altitude') as position_altitude,
GetRecordPropertyValue (GetArrayElement(position,1), 'speed') as position_speed
from one) SELECT imei, System.TimeStamp AS 'start', Avg(position_speed), max(position_latitude)
FROM two TIMESTAMP BY TIMING GROUP BY imei, TumblingWindow(duration(second, 30))
错误出现在最后两行(FROM two TIMESTAMP BY TIMING),
****************** 更新,,查多了,发现只能在输入中使用选项timestamp by,而且只有在make事件的自定义时间戳。通常,它们默认以到达时间为时间戳 (https://msdn.microsoft.com/en-us/library/mt573293.aspx)
现在我的问题是如何使用记录在 Json 文件的第三级数组中的时间字段为我的事件添加时间戳,以便能够进行聚合。
任何关于我如何处理这个问题的建议,谢谢
【问题讨论】:
【参考方案1】:根据https://msdn.microsoft.com/en-us/library/mt598501.aspx TIMESTAMP BY
只能用于输入源,因此您可能希望将其作为第一步的一部分:
WITH Datapackets AS
...
FROM h24 TIMESTAMP BY (expression)
...
此外,来自同一来源,当在窗口上与 GROUP BY
一起使用时,指的是 System.TimeStamp
:
聚合结果的时间戳是这个结果对应的时间窗口的结束。
所以当你在最后的SELECT
语句中写System.TimeStamp
时,它指的是当前窗口的结束。
【讨论】:
感谢 runemoennike 的评论,它帮助我理解。经过更多调查,我发现我只能在输入中使用选项 timestamp by,并且只有在为事件制作自定义时间戳时才必须使用它。通常它们默认为到达时间(msdn.microsoft.com/en-us/library/mt573293.aspx),现在我的问题是如何使用记录在 Json 文件中的第三级数组中的时间字段为我的事件加上时间戳,以便能够进行我的聚合。跨度> 你试过FROM h24 TIMESTAMP BY GetRecordPropertyValue(GetArrayElement(GetArrayElement(dataPackets, 3) ,1), 'timestamp')
吗?虽然我必须补充一点,这是一个非常不稳定的解决方案,因为它依赖于数组的顺序总是相同的。更好的方法可能是设置一个流,其中 Function App 从输入事件/IoT 中心读取,重新格式化为更合适的安排,然后将事件推送到另一个中心,您的流分析查询从中提取事件。
确实如你所说,由于传入数组的结构不同,使用“***”的时间戳表示将没有用。但是我会考虑另一种选择。非常感谢
解析此类复杂事件的最佳方法是自定义 javascript 函数。不幸的是,TIMESTAMP BY 表达式中不允许使用 JavaScript 函数。您可能需要考虑将其拆分为两个查询 - 一个没有 TIMESTAMP BY 来“规范化”数据并提取所需字段,第二个查询使用 TIMESTAMP BY 并执行窗口聚合
这可能是一个好主意康斯坦丁,你的意思是将查询分成两个,你的意思是有另一个流分析作业,其查询从第一个接收规范化数据和然后做时间戳,如果你提到那个,我已经尝试过这个选项并且它正在工作,但我遇到了一个新问题,即第二个流作业接收例如 100K 事件但只给出 5 或 6 个输出,并且没有报错!!如果你的意思是别的,请告诉我,我是这个主题的新手。谢谢你:)以上是关于Azure 流分析 - 应用窗口翻转时自定义“时间戳”出错的主要内容,如果未能解决你的问题,请参考以下文章