过滤掉流分析中的重复项
Posted
技术标签:
【中文标题】过滤掉流分析中的重复项【英文标题】:Filter out duplicates in Stream Analytics 【发布时间】:2020-03-13 08:42:02 【问题描述】:我通过几个不同的桥梁接收来自一些传感器的数据。我收到的数据包含很多重复项。具有相同的序列号、值、(几乎)相同的日期时间等,但来自不同的网桥。数据不包括某种唯一的 eventId,只是每个事件唯一的时间戳,即使是重复的。因此我无法过滤它们。
这是一个例子:
"dsType":"WMBUS","mrfCuId":"B827EBE84EEB","timeStamp":1583750353969,"dateTime":"2020-03-09T10:39:13Z","serialNo":"02001703","manufacturer":"Lansen","modelNo":"LAN_WMBUS_G2_TH","battLvl":0,"bridgeId":"AE8B2FC5","rssi":-25,"hopCnt":1,"latCnt":0,"dpCnt":2,"datapoint":["type":"FLOAT","name":"Temperature","size":32,"dataType":"BCD_DIGIT","unit":"C","res":0.1,"resUnit":"Degrees","valueType":"CSV","value":15.8,"scale":1.0,"min":"-20","max":"55","low":" ","high":" ","type":"NUMBER","name":"Humidity","size":8,"dataType":"UINT8","unit":"%","res":1.0,"resUnit":"%","valueType":"CSV","value":39,"scale":1.0,"min":" ","max":" ","low":" ","high":" "],"uniqueId":"LAS02001703","vif":7,"dif":27,"rssiWmbus":-94,"EventProcessedUtcTime":"2020-03-09T11:54:07.5197619Z","PartitionId":0,"EventEnqueuedUtcTime":"2020-03-09T10:39:14.0440000Z"
"dsType":"WMBUS","mrfCuId":"B827EBE84EEB","timeStamp":1583750354377,"dateTime":"2020-03-09T10:39:14Z","serialNo":"02001703","manufacturer":"Lansen","modelNo":"LAN_WMBUS_G2_TH","battLvl":0,"bridgeId":"01000000","rssi":-35,"hopCnt":1,"latCnt":0,"dpCnt":2,"datapoint":["type":"FLOAT","name":"Temperature","size":32,"dataType":"BCD_DIGIT","unit":"C","res":0.1,"resUnit":"Degrees","valueType":"CSV","value":15.8,"scale":1.0,"min":"-20","max":"55","low":" ","high":" ","type":"NUMBER","name":"Humidity","size":8,"dataType":"UINT8","unit":"%","res":1.0,"resUnit":"%","valueType":"CSV","value":39,"scale":1.0,"min":" ","max":" ","low":" ","high":" "],"uniqueId":"LAS02001703","vif":7,"dif":27,"rssiWmbus":-80,"EventProcessedUtcTime":"2020-03-09T11:54:07.5197619Z","PartitionId":0,"EventEnqueuedUtcTime":"2020-03-09T10:39:14.4190000Z"
是否有某种方法可以?如果有可能的话,数据最终也会进入 Power BI。但是在 Power Bi 中使用“删除重复项”时,您需要一种不同于其他所有内容的 EventId,但对于重复的数据也是如此。
提前致谢!
【问题讨论】:
【参考方案1】:根据您的描述,您只想实现distinct
功能,该功能类似于关系数据库功能,以便您可以根据某些列过滤某些行。
实际上,这可以在 ASA 中得到支持,但存在一些限制。主要思想是使用COUNT and GROUP BY关键词。
例如,我的测试数据如下:
SQL:
从 blobstream 中选择计数(DISTINCT b.timestamp)、b.dsType、b.mrfCuId b GROUP BY b.dsType,b.mrfCuId,TumblingWindow(minute, 5)
输出:
我从这个official example得到了一些线索。
【讨论】:
谢谢!因此,如果我对您的理解正确,通过此查询,您可以在 5 分钟的窗口内选择具有相同 mrfcuid 和 ds 类型的行之一?因为几乎所有数据都有 dstype = "WMBUS" 和 Mrfcuid ="b827EBE84EEB"。我有兴趣区分序列号相同的位置,所以我可以按此分组并有一个只需 2-3 秒的管道窗口? 我尝试将查询更改为使用 serialNo 和 tubling 窗口 2 秒,然后得到“null”作为回报。另一个问题是我不太清楚如何合并我所做的其他查询,它将数组中的不同元素分开:SELECT event.serialNo, CAST(event.dateTime as datetime) as TimeAndDate, DataP.ArrayValue.name, event。 bridgeId,DataP.ArrayValue.value,DataP.ArrayValue.valueTYPE INTO TestOuput FROM eventthubInput AS 事件 CROSS APPLY GetArrayElements(event.datapoint) AS DataP WHERE DataP.ArrayValue.valueType = 'CSV' @skh 你好,我很困惑你从 Azure 门户 UI 上的作业执行或测试过程中得到空值? @skh 或者,也许您可以使用您在问题的最后一条评论中提供的 ASQL 发布一些示例数据,以便我可以在我这边进行测试。 非常感谢!我已经在下面的答案中发布了示例数据,因为我无法在此评论中发布图片:)【参考方案2】:我无法在评论中发布图片,所以请在此处写下我的答案。
这是运行我在评论中发布的查询时的输出结果。在这里你可以看到我已经从每一行的数组中提取了一些想要的值。正如您在此处看到的,第 3 行和第 4 行与第 1 和 2 行完全相同,只是来自不同的桥梁。与第 7&8 行和第 9&10 行相同。因此,如果您理解的话,理想情况下,我只想要一个正确数据的样本,而不是像本例中那样重复。
如果您想测试自己,这里还有一些示例数据:
"dsType":"WMBUS","mrfCuId":"B827EBE84EEB","timeStamp":1584355883141,"dateTime":"2020-03-16T10:51:23Z","serialNo":"02001771","manufacturer":"Lansen","modelNo":"LAN_WMBUS_G2_TH","battLvl":0,"bridgeId":"5D410D00","rssi":-67,"hopCnt":1,"latCnt":0,"dpCnt":2,"datapoint":["type":"FLOAT","name":"Temperature","size":32,"dataType":"BCD_DIGIT","unit":"C","res":0.1,"resUnit":"Degrees","valueType":"CSV","value":18.2,"scale":1.0,"min":"-20","max":"55","low":" ","high":" ","type":"NUMBER","name":"Humidity","size":8,"dataType":"UINT8","unit":"%","res":1.0,"resUnit":"%","valueType":"CSV","value":28,"scale":1.0,"min":" ","max":" ","low":" ","high":" "],"uniqueId":"LAS02001771","vif":7,"dif":27,"rssiWmbus":-16,"EventProcessedUtcTime":"2020-03-16T10:51:23.2682714Z","PartitionId":0,"EventEnqueuedUtcTime":"2020-03-16T10:51:23.2420000Z"
"dsType":"WMBUS","mrfCuId":"B827EBE84EEB","timeStamp":1584355898659,"dateTime":"2020-03-16T10:51:38Z","serialNo":"02001596","manufacturer":"Lansen","modelNo":"LAN_WMBUS_G2_TH","battLvl":0,"bridgeId":"AE8B2FC5","rssi":-24,"hopCnt":1,"latCnt":0,"dpCnt":2,"datapoint":["type":"FLOAT","name":"Temperature","size":32,"dataType":"BCD_DIGIT","unit":"C","res":0.1,"resUnit":"Degrees","valueType":"CSV","value":13.1,"scale":1.0,"min":"-20","max":"55","low":" ","high":" ","type":"NUMBER","name":"Humidity","size":8,"dataType":"UINT8","unit":"%","res":1.0,"resUnit":"%","valueType":"CSV","value":35,"scale":1.0,"min":" ","max":" ","low":" ","high":" "],"uniqueId":"LAS02001596","vif":7,"dif":27,"rssiWmbus":-45,"EventProcessedUtcTime":"2020-03-16T10:51:38.8337473Z","PartitionId":0,"EventEnqueuedUtcTime":"2020-03-16T10:51:38.7330000Z"
"dsType":"WMBUS","mrfCuId":"B827EBE84EEB","timeStamp":1584355898715,"dateTime":"2020-03-16T10:51:38Z","serialNo":"02001596","manufacturer":"Lansen","modelNo":"LAN_WMBUS_G2_TH","battLvl":0,"bridgeId":"5D410D00","rssi":-67,"hopCnt":1,"latCnt":0,"dpCnt":2,"datapoint":["type":"FLOAT","name":"Temperature","size":32,"dataType":"BCD_DIGIT","unit":"C","res":0.1,"resUnit":"Degrees","valueType":"CSV","value":13.1,"scale":1.0,"min":"-20","max":"55","low":" ","high":" ","type":"NUMBER","name":"Humidity","size":8,"dataType":"UINT8","unit":"%","res":1.0,"resUnit":"%","valueType":"CSV","value":35,"scale":1.0,"min":" ","max":" ","low":" ","high":" "],"uniqueId":"LAS02001596","vif":7,"dif":27,"rssiWmbus":-16,"EventProcessedUtcTime":"2020-03-16T10:51:38.8337473Z","PartitionId":0,"EventEnqueuedUtcTime":"2020-03-16T10:51:38.8110000Z"
"dsType":"WMBUS","mrfCuId":"B827EBE84EEB","timeStamp":1584355904394,"dateTime":"2020-03-16T10:51:44Z","serialNo":"02001704","manufacturer":"Lansen","modelNo":"LAN_WMBUS_G2_TH","battLvl":0,"bridgeId":"AE8B2FC5","rssi":-24,"hopCnt":1,"latCnt":0,"dpCnt":2,"datapoint":["type":"FLOAT","name":"Temperature","size":32,"dataType":"BCD_DIGIT","unit":"C","res":0.1,"resUnit":"Degrees","valueType":"CSV","value":19.2,"scale":1.0,"min":"-20","max":"55","low":" ","high":" ","type":"NUMBER","name":"Humidity","size":8,"dataType":"UINT8","unit":"%","res":1.0,"resUnit":"%","valueType":"CSV","value":26,"scale":1.0,"min":" ","max":" ","low":" ","high":" "],"uniqueId":"LAS02001704","vif":7,"dif":27,"rssiWmbus":-58,"EventProcessedUtcTime":"2020-03-16T10:51:44.5783305Z","PartitionId":0,"EventEnqueuedUtcTime":"2020-03-16T10:51:44.4680000Z"
"dsType":"WMBUS","mrfCuId":"B827EBE84EEB","timeStamp":1584355904737,"dateTime":"2020-03-16T10:51:44Z","serialNo":"02001704","manufacturer":"Lansen","modelNo":"LAN_WMBUS_G2_TH","battLvl":0,"bridgeId":"5D410D00","rssi":-67,"hopCnt":1,"latCnt":0,"dpCnt":2,"datapoint":["type":"FLOAT","name":"Temperature","size":32,"dataType":"BCD_DIGIT","unit":"C","res":0.1,"resUnit":"Degrees","valueType":"CSV","value":19.2,"scale":1.0,"min":"-20","max":"55","low":" ","high":" ","type":"NUMBER","name":"Humidity","size":8,"dataType":"UINT8","unit":"%","res":1.0,"resUnit":"%","valueType":"CSV","value":26,"scale":1.0,"min":" ","max":" ","low":" ","high":" "],"uniqueId":"LAS02001704","vif":7,"dif":27,"rssiWmbus":-16,"EventProcessedUtcTime":"2020-03-16T10:51:44.9080895Z","PartitionId":0,"EventEnqueuedUtcTime":"2020-03-16T10:51:44.7960000Z"
"dsType":"WMBUS","mrfCuId":"B827EBE84EEB","timeStamp":1584355907295,"dateTime":"2020-03-16T10:51:47Z","serialNo":"02001701","manufacturer":"Lansen","modelNo":"LAN_WMBUS_G2_TH","battLvl":0,"bridgeId":"AE8B2FC5","rssi":-24,"hopCnt":1,"latCnt":0,"dpCnt":2,"datapoint":["type":"FLOAT","name":"Temperature","size":32,"dataType":"BCD_DIGIT","unit":"C","res":0.1,"resUnit":"Degrees","valueType":"CSV","value":16.2,"scale":1.0,"min":"-20","max":"55","low":" ","high":" ","type":"NUMBER","name":"Humidity","size":8,"dataType":"UINT8","unit":"%","res":1.0,"resUnit":"%","valueType":"CSV","value":28,"scale":1.0,"min":" ","max":" ","low":" ","high":" "],"uniqueId":"LAS02001701","vif":7,"dif":27,"rssiWmbus":-86,"EventProcessedUtcTime":"2020-03-16T10:51:47.4262897Z","PartitionId":0,"EventEnqueuedUtcTime":"2020-03-16T10:51:47.3750000Z"
"dsType":"WMBUS","mrfCuId":"B827EBE84EEB","timeStamp":1584355908044,"dateTime":"2020-03-16T10:51:48Z","serialNo":"02001701","manufacturer":"Lansen","modelNo":"LAN_WMBUS_G2_TH","battLvl":0,"bridgeId":"5D410D00","rssi":-67,"hopCnt":1,"latCnt":0,"dpCnt":2,"datapoint":["type":"FLOAT","name":"Temperature","size":32,"dataType":"BCD_DIGIT","unit":"C","res":0.1,"resUnit":"Degrees","valueType":"CSV","value":16.2,"scale":1.0,"min":"-20","max":"55","low":" ","high":" ","type":"NUMBER","name":"Humidity","size":8,"dataType":"UINT8","unit":"%","res":1.0,"resUnit":"%","valueType":"CSV","value":28,"scale":1.0,"min":" ","max":" ","low":" ","high":" "],"uniqueId":"LAS02001701","vif":7,"dif":27,"rssiWmbus":-16,"EventProcessedUtcTime":"2020-03-16T10:51:48.1936261Z","PartitionId":0,"EventEnqueuedUtcTime":"2020-03-16T10:51:48.1250000Z"
"dsType":"WMBUS","mrfCuId":"B827EBE84EEB","timeStamp":1584355918798,"dateTime":"2020-03-16T10:51:58Z","serialNo":"02001701","manufacturer":"Lansen","modelNo":"LAN_WMBUS_G2_TH","battLvl":0,"bridgeId":"AE8B2FC5","rssi":-24,"hopCnt":1,"latCnt":0,"dpCnt":2,"datapoint":["type":"FLOAT","name":"Temperature","size":32,"dataType":"BCD_DIGIT","unit":"C","res":0.1,"resUnit":"Degrees","valueType":"CSV","value":16.2,"scale":1.0,"min":"-20","max":"55","low":" ","high":" ","type":"NUMBER","name":"Humidity","size":8,"dataType":"UINT8","unit":"%","res":1.0,"resUnit":"%","valueType":"CSV","value":28,"scale":1.0,"min":" ","max":" ","low":" ","high":" "],"uniqueId":"LAS02001701","vif":7,"dif":27,"rssiWmbus":-92,"EventProcessedUtcTime":"2020-03-16T10:51:58.9619079Z","PartitionId":0,"EventEnqueuedUtcTime":"2020-03-16T10:51:58.8610000Z"
【讨论】:
以上是关于过滤掉流分析中的重复项的主要内容,如果未能解决你的问题,请参考以下文章
Swift:在表格视图中过滤结果,因此它不会从 JSON 返回重复项
为啥我们不能使用 rank() 分析函数来删除表中的重复项?