Flink CEP 事件未触发

Posted

技术标签:

【中文标题】Flink CEP 事件未触发【英文标题】:Flink CEP Event Not triggering 【发布时间】:2020-07-06 07:01:59 【问题描述】:

我已经在 Flink 中实现了 CEP 模式,它按预期工作,连接到本地 Kafka 代理。但是当我连接到基于集群的云 kafka 设置时,Flink CEP 没有触发。

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    //saves checkpoint
    env.getCheckpointConfig().enableExternalizedCheckpoints(
            CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

我正在使用 AscendingTimestampExtractor,

consumer.assignTimestampsAndWatermarks(
    new AscendingTimestampExtractor<ObjectNode>() 
      @Override
      public long extractAscendingTimestamp(ObjectNode objectNode) 
        long timestamp;
        Instant instant = Instant.parse(objectNode.get("value").get("timestamp").asText());
        timestamp = instant.toEpochMilli();
        return timestamp;
      
    );

我也收到了警告信息,

AscendingTimestampExtractor:140 - 违反时间戳单调性:1594017872227

我还尝试使用 AssignerWithPeriodicWatermarks 和 AssignerWithPunctuatedWatermarks 都没有工作

我附上了未分配 Watermark 的 Flink 控制台屏幕截图。 Updated flink console screenshot

有人可以帮忙吗?

【问题讨论】:

由于并行度为1,所以问题不在于空闲源。我们能否看到整个作业图——CEP 之后会发生什么? @DavidAnderson 感谢您的回复。在 CEP 之后,我们只是以 json 格式记录匹配的事件。我将 currentOutputWatermark 视为一些数值,例如 -9223372036854776000。我已经更新了 Question session 中的 flink 控制台截图。但是,当它将所有东西移动到集群时,本地设置工作的同一个 jar 就会出现问题。 我怀疑您的问题是由于在空闲分区上使用了每个分区的水印造成的。 @DavidAnderson 谢谢..!现在可以了。 kafka 集群中的理想分区问题导致此问题 【参考方案1】:

CEP 必须首先根据水印对输入流进行排序。所以 问题可能出在水印上,但您没有向我们展示足够的信息来调试原因。一个常见问题是idle source,它会阻止水印前进。

但还有其他可能的原因。为了调试这种情况,我建议您查看一些指标,无论是在 Flink Web UI 中,还是在指标系统中(如果您已连接)。首先,通过查看管道不同阶段的numRecordsInnumRecordsOutnumRecordsInPerSecondnumRecordsOutPerSecond,检查记录是否在流动。

如果有事件,那么在你工作的不同任务中查看currentOutputWatermark,看看事件时间是否提前。

更新:

看来您可能在 Kafka 消费者上调用 assignTimestampsAndWatermarks,这将导致每个分区的水印。在这种情况下,如果您有一个空闲分区,则该分区不会产生任何水印,这将阻止整个水印。尝试在源生成的 DataStream 上调用 assignTimestampsAndWatermarks,以查看是否可以解决问题。 (当然,如果没有每个分区的水印,您将无法使用 AscendingTimestampExtractor,因为流不会按顺序排列。)

【讨论】:

以上是关于Flink CEP 事件未触发的主要内容,如果未能解决你的问题,请参考以下文章

大数据计算引擎之Flink Flink CEP复杂事件编程

Flink CEP - Flink的复杂事件处理

Flink CEP 处理匹配事件

案例简介flink CEP

Flink CEP:哪种方法可以为不同类型的事件加入数据流?

Watermark 在 Flink CEP 中远远落后