Flink CEP 事件未触发
Posted
技术标签:
【中文标题】Flink CEP 事件未触发【英文标题】:Flink CEP Event Not triggering 【发布时间】:2020-07-06 07:01:59 【问题描述】:我已经在 Flink 中实现了 CEP 模式,它按预期工作,连接到本地 Kafka 代理。但是当我连接到基于集群的云 kafka 设置时,Flink CEP 没有触发。
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//saves checkpoint
env.getCheckpointConfig().enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
我正在使用 AscendingTimestampExtractor,
consumer.assignTimestampsAndWatermarks(
new AscendingTimestampExtractor<ObjectNode>()
@Override
public long extractAscendingTimestamp(ObjectNode objectNode)
long timestamp;
Instant instant = Instant.parse(objectNode.get("value").get("timestamp").asText());
timestamp = instant.toEpochMilli();
return timestamp;
);
我也收到了警告信息,
AscendingTimestampExtractor:140 - 违反时间戳单调性:1594017872227
我还尝试使用 AssignerWithPeriodicWatermarks 和 AssignerWithPunctuatedWatermarks 都没有工作
我附上了未分配 Watermark 的 Flink 控制台屏幕截图。 Updated flink console screenshot
有人可以帮忙吗?
【问题讨论】:
由于并行度为1,所以问题不在于空闲源。我们能否看到整个作业图——CEP 之后会发生什么? @DavidAnderson 感谢您的回复。在 CEP 之后,我们只是以 json 格式记录匹配的事件。我将 currentOutputWatermark 视为一些数值,例如 -9223372036854776000。我已经更新了 Question session 中的 flink 控制台截图。但是,当它将所有东西移动到集群时,本地设置工作的同一个 jar 就会出现问题。 我怀疑您的问题是由于在空闲分区上使用了每个分区的水印造成的。 @DavidAnderson 谢谢..!现在可以了。 kafka 集群中的理想分区问题导致此问题 【参考方案1】:CEP 必须首先根据水印对输入流进行排序。所以 问题可能出在水印上,但您没有向我们展示足够的信息来调试原因。一个常见问题是idle source,它会阻止水印前进。
但还有其他可能的原因。为了调试这种情况,我建议您查看一些指标,无论是在 Flink Web UI 中,还是在指标系统中(如果您已连接)。首先,通过查看管道不同阶段的numRecordsIn
、numRecordsOut
或numRecordsInPerSecond
和numRecordsOutPerSecond
,检查记录是否在流动。
如果有事件,那么在你工作的不同任务中查看currentOutputWatermark
,看看事件时间是否提前。
更新:
看来您可能在 Kafka 消费者上调用 assignTimestampsAndWatermarks
,这将导致每个分区的水印。在这种情况下,如果您有一个空闲分区,则该分区不会产生任何水印,这将阻止整个水印。尝试在源生成的 DataStream 上调用 assignTimestampsAndWatermarks
,以查看是否可以解决问题。 (当然,如果没有每个分区的水印,您将无法使用 AscendingTimestampExtractor,因为流不会按顺序排列。)
【讨论】:
以上是关于Flink CEP 事件未触发的主要内容,如果未能解决你的问题,请参考以下文章