从事件流中查找事件的子序列

Posted

技术标签:

【中文标题】从事件流中查找事件的子序列【英文标题】:Find sub-sequence of events from a stream of events 【发布时间】:2016-06-20 12:06:59 【问题描述】:

我在下面给出我的问题的缩影版

我有 2 个不同的传感器以流的形式发送 1/0 值。我能够使用 Kafka 使用流并将其带入火花进行处理。请注意我在下面给出的示例流。

时间 --------------> 1 2 3 4 5 6 7 8 9 10

传感器名称 --> A A B B B B A B A A

传感器值 ---> 1 0 1 0 1 0 0 1 1 0

我想确定此流中出现的子序列模式。例如,如果 A =0 并且流中的下一个值(基于时间)是 B =1,那么我想推送警报。在上面的示例中,我突出显示了 2 个地方——我想要发出警报的地方。一般来说,它会像

“如果一组传感器事件组合发生在一个时间间隔内, 发出警报”。

我是 spark 新手,不了解 Scala。我目前正在使用 python 进行编码。

我的实际问题包含更多传感器,每个传感器可以有不同的值组合。意思是我的子序列和事件流

我尝试了几个选项都没有成功

窗口函数 - 可用于移动平均累积和 等不适用于此用例 将 spark Dataframes /RDDs 带入本地 python 结构,如列表 和 panda 数据帧并进行子排序 - 这需要很多 经过一些迭代后,随机播放和触发排队的事件流 UpdateStatewithKey – 尝试了几种方法,但无法理解 完全如何工作以及这是否适用于此用途 案例。

【问题讨论】:

您找到最佳解决方案了吗? 【参考方案1】:

任何想解决这个问题的人都可以使用我的解决方案:

1- 为了让它们保持连接,您需要使用 collect_list 收集事件。

2- 最好在 collect_list 上对您的事件进行排序,但要小心,因为它按第一列排列数据,因此将 DateTime 放在该列中很重要。

3- 例如,我从 collect_list 中删除了 DateTime。

4- 最后,您应该联系所有元素以使用包含等字符串函数来探索它以找到您的子序列。

.agg(expr("array_join(TRANSFORM(array_sort(collect_list((Time , Sensor Value))), a -> a.Time ),'')")as "MySequence")

在这个 agg 函数之后,您可以使用任何正则表达式或字符串函数来检测您的模式。

查看此链接以获取有关 collect_list 的更多信息: collect list 检查此链接以获取有关排序 collect_list 的更多信息: sort a collect list

【讨论】:

以上是关于从事件流中查找事件的子序列的主要内容,如果未能解决你的问题,请参考以下文章

大数据(9g)FlinkCEP

大数据(9g)FlinkCEP

《OD学hadoop》20160910某旅游网项目实战

如何比较流中的每个事件,streamController dart

浏览器事件流

Flink CEP - Flink的复杂事件处理