Flink:窗口不会在流结束时处理数据

Posted

技术标签:

【中文标题】Flink:窗口不会在流结束时处理数据【英文标题】:Flink: Window does not process data at end of stream 【发布时间】:2018-03-08 00:38:05 【问题描述】:

我有一个带有 flink kafka 消费者的流(kafka msgs 正在流式传输到一个主题),我注意到我正在寻找解决一个有趣的行为。

当数据流入时,如果它在一个窗口“完成”之前停止,或者如果数据结束(在几个窗口之后)并且没有到达窗口的末尾,则管道的其余部分不会触发。

示例流程:

    env.addSource(kafkaConsumer)
       .flatMap(new TokenMapper())
       .keyBy("word")
       .window(TumblingEventTimeWindows.of(Time.seconds(10L)))
       .reduce(new CountTokens())
       .flatMap(new ConvertToString())
       .addSink(producer);

我正在使用 FlinkKafkaConsumer010,并将 env TimeCharacteristic 设置为 EventTime。和 consumer.assignTimestampsAndWatermarks(new PeriodicWatermarks())

private static class PeriodicWatermarks implements   AssignerWithPeriodicWatermarks<String>

    private long currentMaxTimestamp;
    private final long maxOutOfOrderness;

    public PeriodicWatermarksAuto(long maxOutOfOrderness)
        this.maxOutOfOrderness = maxOutOfOrderness;
    

    @Override
    public Watermark getCurrentWatermark() 
         return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
    

    @Override
    public long extractTimestamp(String t, long l) 
        // this should be the event timestamp
        currentMaxTimestamp = l;
        logger.info("TIMESTAMP: " + l);
        return l;
    

如果我的窗口是 10 秒,而我的数据流仅包含 8 秒的数据(然后停止流式传输一段时间),则 flatMap->sink 不会处理,直到新的后续数据流入。

示例数据流处理问题:(每个 x 是每秒一条数据 )

      xxxxxxxx(8secs)------(gap)--(later more data)xxxxx
      ^(not processed)           (until I get here)^

类似地,例如,如果我有 35 秒的流数据(我的窗口也是 10 秒)只有 3 个窗口的数据触发,其余 5 秒的数据从不处理。

     ...xxxxxxxxxx(10secs)xxxxx(5secods)------(gap)--(later more data)xxxxx
         (processed)        ^(not processed)          (until I get here)^

最后,如果我的窗口是 10 秒,而我只有有 5 秒的流数据,那么 flatmap->sink 永远不会发生。

我的问题是,如果我们在一段时间后看不到数据,有没有办法触发窗口数据处理?

如果我的数据正在实时流式传输,我可以看到有一段无数据,并且不希望最后一个窗口(比如说只有 5 秒的数据)不得不等待一些不确定的时间,直到新数据进来了,我想要窗口时间过去后最后一个窗口的结果。

大声思考,这似乎是由于使用 EventTime 而不是 ProcessingTime,或者,我的水印没有正确生成以使最后一个窗口实际触发......不确定可能两者兼而有之?如果您的流结束最后一位未触发,我认为这对任何人来说都是一个问题。我会说我可能会发送一个流结束消息,但是如果由于源中断上游而导致流结束,这将无济于事。

编辑:所以我更改为处理时间,它确实正确处理了最后一个窗口中的数据,所以我猜 EventTime 毕竟是罪魁祸首,我认为自定义触发器或正确的窗口水印可能是答案......

感谢您的帮助!

【问题讨论】:

【参考方案1】:

我将把它留给后代,因为问题与我想的一样,与水印有关。时间戳和水印(来自assignTimestampsAndWatermarks)调用“getCurrentWatermark()”,并且由于我将基于传入实体的水印设置为固定数字(它们的时间戳 - 最大偏移量)它不会更新,直到它看到一个新实体。

如果在可配置的时间内没有看到数据,我的解决方案是使用某种计时器最终将水印推进到下一个窗口。我将无法处理非常潜在的数据,但我不认为这应该是一个问题。这是 EventTime 处理的预期行为。

【讨论】:

您可能希望考虑将自定义触发器添加到窗口以缩短处理时间。正是由于这个原因,eventTime 窗口通常与自定义处理时间触发器结合使用。 是的,我就是这么想的。我得到了它的解决方法,但我同意自定义触发器是最好的。 但是,如果没有新元素进入您的自定义触发器,则不会被调用。关键是水印会前进。

以上是关于Flink:窗口不会在流结束时处理数据的主要内容,如果未能解决你的问题,请参考以下文章

学习笔记Flink—— Flink数据流模型时间窗口和核心概念

Flink 源码解读系列 DataStream 窗口 Window 实现

Flink 源码解读系列 DataStream 窗口 Window 实现

Flink实战 - Time & Windows编程

Flink Window窗口开始结束时间分析-源码探索

Flink Window窗口开始结束时间分析-源码探索