flink cep 跳过策略 AfterMatchSkipStrategy.skipPastLastEvent() 匹配过的不再匹配 碧坑指南

Posted 辉常努腻

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了flink cep 跳过策略 AfterMatchSkipStrategy.skipPastLastEvent() 匹配过的不再匹配 碧坑指南相关的知识,希望对你有一定的参考价值。

指示匹配过程后的跳过策略
今天讲的是 flink cep 如何实现 多个窗口之间的滚动匹配
即避免以下这种情况出现,当然是否需要避免取决你的工作需求或者要学习什么东西
flink cep pattern 代码

//定义一个匹配模式
val pattern: Pattern[weather, weather] = Pattern
      .begin[weather]("one")
      //一次或多次
      .timesOrMore(12)
      //时间是12小时的范围 相当于窗口
      .within(Time.hours(12))
out -> [1,May 10 15:00:00 ],[1,May 10 16:00:00 ],[1,May 10 17:00:00 ],[1,May 10 18:00:00 ]
out -> [1,May 10 16:00:00 ],[1,May 10 17:00:00 ],[1,May 10 18:00:00 ],[1,May 10 19:00:00 ]
out -> [1,May 10 17:00:00 ],[1,May 10 18:00:00 ],[1,May 10 19:00:00 ],[1,May 10 20:00:00 ]

然而你想要的是这样的匹配规则,则是匹配过一次之后就不再使用这条数据作为其他匹配的数据源

out -> [1,May 10 15:00:00 ],[1,May 10 16:00:00 ],[1,May 10 17:00:00 ],[1,May 10 18:00:00 ]
out -> [1,May 10 19:00:00 ],[1,May 10 20:00:00 ],[1,May 10 21:00:00 ],[1,May 10 22:00:00 ]
out -> [1,May 10 23:00:00 ],[1,May 11 00:00:00 ],[1,May 11 01:00:00 ],[1,May 11 02:00:00 ]

修改匹配模式代码

    /**
     * AfterMatchSkipStrategy 指示匹配过程后的跳过策略
     */
     //定义一个匹配模式
    val pattern: Pattern[weather, weather] = Pattern
    //AfterMatchSkipStrategy 指示匹配过程后的跳过策略
    //skipPastLastEvent  丢弃在发出的匹配结束之前开始的每个部分匹配
      .begin[weather]("one" , AfterMatchSkipStrategy.skipPastLastEvent())
            //一次或多次
      .timesOrMore(1)
      //时间是12小时的范围 相当于窗口
      .within(Time.hours(5))

得到臆想的数据

然后就可以得到我们想要的数据啦 看一下我的实际数据

==== Map(one -> Buffer(weather(Tue May 10 15:00:00 CST 2022,台湾省,2428,台南,9999.0,9999.0,9999.0), weather(Tue May 10 16:00:00 CST 2022,台湾省,2428,台南,9999.0,9999.0,9999.0), weather(Tue May 10 17:00:00 CST 2022,台湾省,2428,台南,9999.0,9999.0,9999.0), weather(Tue May 10 18:00:00 CST 2022,台湾省,2428,台南,9999.0,9999.0,9999.0), weather(Tue May 10 19:00:00 CST 2022,台湾省,2428,台南,9999.0,9999.0,9999.0)))
 ---- Some(Buffer(weather(Tue May 10 15:00:00 CST 2022,台湾省,2428,台南,9999.0,9999.0,9999.0), weather(Tue May 10 16:00:00 CST 2022,台湾省,2428,台南,9999.0,9999.0,9999.0), weather(Tue May 10 17:00:00 CST 2022,台湾省,2428,台南,9999.0,9999.0,9999.0), weather(Tue May 10 18:00:00 CST 2022,台湾省,2428,台南,9999.0,9999.0,9999.0), weather(Tue May 10 19:00:00 CST 2022,台湾省,2428,台南,9999.0,9999.0,9999.0)))
 ==== Map(one -> Buffer(weather(Tue May 10 15:00:00 CST 2022,台湾省,2429,台中,9999.0,9999.0,9999.0), weather(Tue May 10 16:00:00 CST 2022,台湾省,2429,台中,9999.0,9999.0,9999.0), weather(Tue May 10 17:00:00 CST 2022,台湾省,2429,台中,9999.0,9999.0,9999.0), weather(Tue May 10 18:00:00 CST 2022,台湾省,2429,台中,9999.0,9999.0,9999.0), weather(Tue May 10 19:00:00 CST 2022,台湾省,2429,台中,9999.0,9999.0,9999.0)))
 ---- Some(Buffer(weather(Tue May 10 15:00:00 CST 2022,台湾省,2429,台中,9999.0,9999.0,9999.0), weather(Tue May 10 16:00:00 CST 2022,台湾省,2429,台中,9999.0,9999.0,9999.0), weather(Tue May 10 17:00:00 CST 2022,台湾省,2429,台中,9999.0,9999.0,9999.0), weather(Tue May 10 18:00:00 CST 2022,台湾省,2429,台中,9999.0,9999.0,9999.0), weather(Tue May 10 19:00:00 CST 2022,台湾省,2429,台中,9999.0,9999.0,9999.0)))
 ==== Map(one -> Buffer(weather(Tue May 10 15:00:00 CST 2022,台湾省,2426,台北,9999.0,9999.0,9999.0), weather(Tue May 10 16:00:00 CST 2022,台湾省,2426,台北,9999.0,9999.0,9999.0), weather(Tue May 10 17:00:00 CST 2022,台湾省,2426,台北,9999.0,9999.0,9999.0), weather(Tue May 10 18:00:00 CST 2022,台湾省,2426,台北,9999.0,9999.0,9999.0), weather(Tue May 10 19:00:00 CST 2022,台湾省,2426,台北,9999.0,9999.0,9999.0)))
 ---- Some(Buffer(weather(Tue May 10 15:00:00 CST 2022,台湾省,2426,台北,9999.0,9999.0,9999.0), weather(Tue May 10 16:00:00 CST 2022,台湾省,2426,台北,9999.0,9999.0,9999.0), weather(Tue May 10 17:00:00 CST 2022,台湾省,2426,台北,9999.0,9999.0,9999.0), weather(Tue May 10 18:00:00 CST 2022,台湾省,2426,台北,9999.0,9999.0,9999.0), weather(Tue May 10 19:00:00 CST 2022,台湾省,2426,台北,9999.0,9999.0,9999.0)))
 ==== Map(one -> Buffer(weather(Tue May 10 15:00:00 CST 2022,台湾省,2427,台东,9999.0,9999.0,9999.0), weather(Tue May 10 16:00:00 CST 2022,台湾省,2427,台东,9999.0,9999.0,9999.0), weather(Tue May 10 17:00:00 CST 2022,台湾省,2427,台东,9999.0,9999.0,9999.0), weather(Tue May 10 18:00:00 CST 2022,台湾省,2427,台东,9999.0,9999.0,9999.0), weather(Tue May 10 19:00:00 CST 2022,台湾省,2427,台东,9999.0,9999.0,9999.0)))
 ---- Some(Buffer(weather(Tue May 10 15:00:00 CST 2022,台湾省,2427,台东,9999.0,9999.0,9999.0), weather(Tue May 10 16:00:00 CST 2022,台湾省,2427,台东,9999.0,9999.0,9999.0), weather(Tue May 10 17:00:00 CST 2022,台湾省,2427,台东,9999.0,9999.0,9999.0), weather(Tue May 10 18:00:00 CST 2022,台湾省,2427,台东,9999.0,9999.0,9999.0), weather(Tue May 10 19:00:00 CST 2022,台湾省,2427,台东,9999.0,9999.0,9999.0)))

前方有危险

这个方法的应用实际上是有问题的,我们来分析一下
我们的规则是匹配过的数据不可以再次进行匹配,然而当我们设置了.timesOrMore(1)我们会发现数据里每次只有一条数据,这是为什么呢?
我分析.timesOrMore(1)会使模式匹配将所有可以匹配到的数据数据量达到1次时就进行收集,如果我们加上了.begin[weather]("one",AfterMatchSkipStrategy.skipPastLastEvent()),這個我们刚刚学了,他是使将所有匹配过的数据不再进行匹配,这个规则再加上.timesOrMore(1)会默认有一条1条数据的匹配,所以造成了所有的数据匹配都是一条。!?

重点碧坑

如果我们需要用到.begin[weather]("one",AfterMatchSkipStrategy.skipPastLastEvent()).timesOrMore(1)那末我们一定要考虑每次匹配的数据至少有几条,其实这里就已经固定了条数,而不再是或者更更多条数的匹配规则,这里可以考虑动态更改匹配条数,可以做数据库持久化,然后实时动态更新。动态更改所需匹配的固定条数,这里由于才疏学浅,尚未摸索到更好的方法,后续几天如果有解决方案我会更新这个博客。

革命尚未成功-同志仍须努力

跳过策略

接下来我们讨论不同跳过策略对匹配结果的影响:

  • 不跳过(NO_SKIP)
    代码调用 AfterMatchSkipStrategy.noSkip()。这是默认策略,所有可能的匹配都会输出。所以这里会输出完整的 6 个匹配。

  • 跳至下一个(SKIP_TO_NEXT)
    代码调用 AfterMatchSkipStrategy.skipToNext()。找到一个 a1 开始的最大匹配之后,跳过a1 开始的所有其他匹配,直接从下一个 a2 开始匹配起。当然 a2 也是如此跳过其他匹配。最终得到(a1 a2 a3 b),(a2 a3 b),(a3 b)。可以看到,这种跳过策略跟使用.greedy()效果是相同的。

  • 跳过所有子匹配(SKIP_PAST_LAST_EVENT)
    代码调用 AfterMatchSkipStrategy.skipPastLastEvent()。找到 a1 开始的匹配(a1 a2 a3 b)之后,直接跳过所有 a1 直到 a3 开头的匹配,相当于把这些子匹配都跳过了。最终得到(a1 a2 a3 b),这是最为精简的跳过策略。

  • 跳至第一个(SKIP_TO_FIRST[a])
    代码调用 AfterMatchSkipStrategy.skipToFirst(“a”),这里传入一个参数,指明跳至哪个模式的第一个匹配事件。找到 a1 开始的匹配(a1 a2 a3 b)后,跳到以最开始一个 a(也就是 a1)为开始的匹配,相当于只留下 a1 开始的匹配。最终得到(a1 a2 a3 b),(a1 a2 b),(a1 b)。

  • 跳至最后一个(SKIP_TO_LAST[a])
    代码调用 AfterMatchSkipStrategy.skipToLast(“a”),同样传入一个参数,指明跳至哪个模式的最后一个匹配事件。找到 a1 开始的匹配(a1 a2 a3 b)后,跳过所有 a1、a2 开始的匹配,跳到以最后一个 a(也就是 a3)为开始的匹配。最终得到(a1 a2 a3 b),(a3 b)。

以上是关于flink cep 跳过策略 AfterMatchSkipStrategy.skipPastLastEvent() 匹配过的不再匹配 碧坑指南的主要内容,如果未能解决你的问题,请参考以下文章

大数据计算引擎之Flink Flink CEP复杂事件编程

Flink CEP 事件未触发

大数据之CEP报警信息

案例简介flink CEP

“广播状态”为 Flink 的 CEP 库解除了“动态模式”功能的实现是啥意思?

Flink SQL CEP 学习内容