Watermark 在 Flink CEP 中远远落后

Posted

技术标签:

【中文标题】Watermark 在 Flink CEP 中远远落后【英文标题】:Watermark fell far behind in Flink CEP 【发布时间】:2020-10-21 02:40:08 【问题描述】:

我正在使用 Flink CEP 来检测来自 Kafka 的事件的模式。为简单起见,事件只有一种类型。我正在尝试检测连续事件流中字段值的变化。代码如下所示

val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
streamEnv.addSource(new FlinkKafkaConsumer[..]())
          .filter(...)
          .map(...)
          .assignTimestampsAndWatermarks(
            WatermarkStrategy.forMonotonousTimestamps[Event]().withTimestampAssigner(..)
          )
          .keyBy(...)(TypeInformation.of(classOf[...]))
    
val pattern: Pattern[Event, _] = 
          Pattern.begin[Event]("start", AfterMatchSkipStrategy.skipPastLastEvent()).times(1)
          .next("middle")
          .oneOrMore()
          .optional()
          .where(new IterativeCondition[Event] 
             override def filter(event: Event, ctx:...): Boolean = 
                 val startTrafficEvent = ctx.getEventsForPattern("start").iterator().next()
                 startTrafficEvent.getFieldValue().equals(event.getFieldValue())
             
          )
          .next("end").times(1)
          .where(new IterativeCondition[Event] 
             override def filter(event: Event, ctx:...): Boolean = 
                  val startTrafficEvent = ctx.getEventsForPattern("start").iterator().next()
                  !startTrafficEvent.getFieldValue().equals(event.getFieldValue())
            
          )
          .within(Time.seconds(30))

Kafka 主题有 104 个分区,事件均匀分布在各个分区上。当我提交作业时,parallelism 设置为 104。

在 Web UI 中,有 2 个任务:第一个是 Source->filter->map->timestamp/watermark;第二个是CepOperator->sink。每个任务有 104 个并行度。

子任务的工作量不均衡,应该来自keyBy。子任务之间的水印不一样,但是开始卡在一个数值上,很长一段时间没有变化。从日志中,我可以看到 CEP 不断评估事件,并将匹配的结果推送到下游接收器。

事件速率为 10k/s,第一个任务的背压保持high,第二个任务保持ok

请帮助解释 CEP 中发生了什么以及如何解决该问题

谢谢

【问题讨论】:

【参考方案1】:

在更仔细地考虑了您的问题后,我正在修改我的答案。

听起来 CEP 正在继续产生匹配并且它们被推送到接收器,但 CEP+接收器任务正在产生高背压。找出背压的原因会有所帮助。

如果事件可以从所有分区读取,但水印只是勉强推进,听起来背压严重到足以阻止事件被摄取。

我怀疑

    CEP 引擎的工作量组合爆炸,和/或 足够的匹配,接收器无法跟上

可能的原因。

获得更多洞察力的一些想法:

(1) 尝试使用分析器来确定 CepOperator 是否是瓶颈,并可能确定它在做什么。

(2) 禁用 CepOperator 和接收器之间的运算符链接以隔离 CEP——仅作为调试步骤。这将使您(通过指标和背压监控)更好地了解 CEP 和接收器各自在做什么。

(3) 在较小的设置中对此进行测试,然后扩展 CEP 日志记录。

【讨论】:

感谢您的回答大卫。当我检查带有延迟水印的子任务时,我发现该子任务曾经收到过数据。您是否认为 CEP 可能会花费太多时间来处理该分区中的数据并最终减慢整个工作的速度? 您的意思是“从未收到”还是“仍然收到”? 我的意思是“仍然收到” 实际上我们将事件平均分配到 kafka 分区。 keyBy调用后,每个key下的事件数不均匀,KeyedStream上的水印卡住了 我启用了 15 分钟间隔和 10 分钟超时的检查点,但由于超时而无法写出状态

以上是关于Watermark 在 Flink CEP 中远远落后的主要内容,如果未能解决你的问题,请参考以下文章

案例简介flink CEP

Flink CEP - Flink的复杂事件处理

是否可以在 apache flink CEP 中处理多个流?

风控系统之CEP - Esper还是Flink?

Flink CEP 事件未触发

Flink CEP 实现恶意登录检测