Flink:窗口不会在流结束时处理数据
Posted
技术标签:
【中文标题】Flink:窗口不会在流结束时处理数据【英文标题】:Flink: Window does not process data at end of stream 【发布时间】:2018-03-08 00:38:05 【问题描述】:我有一个带有 flink kafka 消费者的流(kafka msgs 正在流式传输到一个主题),我注意到我正在寻找解决一个有趣的行为。
当数据流入时,如果它在一个窗口“完成”之前停止,或者如果数据结束(在几个窗口之后)并且没有到达窗口的末尾,则管道的其余部分不会触发。
示例流程:
env.addSource(kafkaConsumer)
.flatMap(new TokenMapper())
.keyBy("word")
.window(TumblingEventTimeWindows.of(Time.seconds(10L)))
.reduce(new CountTokens())
.flatMap(new ConvertToString())
.addSink(producer);
我正在使用 FlinkKafkaConsumer010,并将 env TimeCharacteristic 设置为 EventTime。和 consumer.assignTimestampsAndWatermarks(new PeriodicWatermarks())
private static class PeriodicWatermarks implements AssignerWithPeriodicWatermarks<String>
private long currentMaxTimestamp;
private final long maxOutOfOrderness;
public PeriodicWatermarksAuto(long maxOutOfOrderness)
this.maxOutOfOrderness = maxOutOfOrderness;
@Override
public Watermark getCurrentWatermark()
return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
@Override
public long extractTimestamp(String t, long l)
// this should be the event timestamp
currentMaxTimestamp = l;
logger.info("TIMESTAMP: " + l);
return l;
如果我的窗口是 10 秒,而我的数据流仅包含 8 秒的数据(然后停止流式传输一段时间),则 flatMap->sink 不会处理,直到新的后续数据流入。
示例数据流处理问题:(每个 x 是每秒一条数据 )
xxxxxxxx(8secs)------(gap)--(later more data)xxxxx
^(not processed) (until I get here)^
类似地,例如,如果我有 35 秒的流数据(我的窗口也是 10 秒)只有 3 个窗口的数据触发,其余 5 秒的数据从不处理。
...xxxxxxxxxx(10secs)xxxxx(5secods)------(gap)--(later more data)xxxxx
(processed) ^(not processed) (until I get here)^
最后,如果我的窗口是 10 秒,而我只有有 5 秒的流数据,那么 flatmap->sink 永远不会发生。
我的问题是,如果我们在一段时间后看不到数据,有没有办法触发窗口数据处理?
如果我的数据正在实时流式传输,我可以看到有一段无数据,并且不希望最后一个窗口(比如说只有 5 秒的数据)不得不等待一些不确定的时间,直到新数据进来了,我想要窗口时间过去后最后一个窗口的结果。
大声思考,这似乎是由于使用 EventTime 而不是 ProcessingTime,或者,我的水印没有正确生成以使最后一个窗口实际触发......不确定可能两者兼而有之?如果您的流结束最后一位未触发,我认为这对任何人来说都是一个问题。我会说我可能会发送一个流结束消息,但是如果由于源中断上游而导致流结束,这将无济于事。
编辑:所以我更改为处理时间,它确实正确处理了最后一个窗口中的数据,所以我猜 EventTime 毕竟是罪魁祸首,我认为自定义触发器或正确的窗口水印可能是答案......
感谢您的帮助!
【问题讨论】:
【参考方案1】:我将把它留给后代,因为问题与我想的一样,与水印有关。时间戳和水印(来自assignTimestampsAndWatermarks)调用“getCurrentWatermark()”,并且由于我将基于传入实体的水印设置为固定数字(它们的时间戳 - 最大偏移量)它不会更新,直到它看到一个新实体。
如果在可配置的时间内没有看到数据,我的解决方案是使用某种计时器最终将水印推进到下一个窗口。我将无法处理非常潜在的数据,但我不认为这应该是一个问题。这是 EventTime 处理的预期行为。
【讨论】:
您可能希望考虑将自定义触发器添加到窗口以缩短处理时间。正是由于这个原因,eventTime 窗口通常与自定义处理时间触发器结合使用。 是的,我就是这么想的。我得到了它的解决方法,但我同意自定义触发器是最好的。 但是,如果没有新元素进入您的自定义触发器,则不会被调用。关键是水印会前进。以上是关于Flink:窗口不会在流结束时处理数据的主要内容,如果未能解决你的问题,请参考以下文章
学习笔记Flink—— Flink数据流模型时间窗口和核心概念
Flink 源码解读系列 DataStream 窗口 Window 实现