Apache Flink - 事件时间

Posted

技术标签:

【中文标题】Apache Flink - 事件时间【英文标题】:Apache Flink - Event time 【发布时间】:2018-09-03 06:05:15 【问题描述】:

我想在 Apache flink 中为我的事件创建一个事件时钟。我正在按照以下方式进行操作

public class TimeStampAssigner implements AssignerWithPeriodicWatermarks<Tuple2<String, String>> 


    private final long maxOutOfOrderness = 0; // 3.5 

    private long currentMaxTimestamp;

    @Override
    public long extractTimestamp(Tuple2<String, String> element, long previousElementTimestamp) 

        currentMaxTimestamp = new  Date().getTime();

        return currentMaxTimestamp;
    



    @Override
    public Watermark getCurrentWatermark() 

        return new Watermark(currentMaxTimestamp - maxOutOfOrderness);


    


请检查上面的代码并告诉我是否正确。在事件时间和水印分配之后,我想处理正在处理的流函数,在该函数中,我将为不同的键收集 10 分钟的流数据。

【问题讨论】:

【参考方案1】:

不,这不是一个合适的实现。事件时间时间戳应该是确定性的(即可重现的),并且应该基于事件流中的数据。相反,如果您打算使用 Date().getTime,那么您或多或少会使用处理时间。

通常在进行事件时间处理时,您的事件会有一个时间戳字段,时间戳提取器将返回该字段的值。

您展示的实现将失去使用事件时间带来的大部分好处,例如重新处理历史数据以重现历史结果的能力。

【讨论】:

【参考方案2】:

您的实现是在 Flink 系统中实现摄取时间,而不是事件时间。例如,如果您从 Kafka 消费,previousElementTimestamp 通常应该指向 Kafka 产生事件的时间(如果 Kafka 生产者没有说其他的话),这将使您的流处理可重现。

如果你想在 Flink 中实现事件时间处理,你应该使用一些与你的元素相关联的时间戳。这可能是元素本身或元素本身内部(这对时间序列有意义)或存储在 Kafka 中并在 previousElementTimestamp 下可用。

关于maxOutOfOrderness,您可能还想考虑 Flink 的 side output 功能,该功能可以在窗口创建后获取后期元素并更新 Flink 作业的输出。

如果您从 Kafka 消费,并且想要简单的一些数据丢失事件时间处理实现,请使用 AscendingTimestampExtractor。 AscendingTimestampExtractor 存在一些潜在问题,如果您的数据未在分区内排序,或者您在运算符之后而不是直接在 KafkaSource 之后应用此提取器,则可能会出现这些问题。 对于强大的工业用例,您应该将 Watermark Ingestion 实现到持久性日志存储中,如 Google DataFlow model 中所述。

【讨论】:

以上是关于Apache Flink - 事件时间的主要内容,如果未能解决你的问题,请参考以下文章

Apache Flink:使用事件时间方式处理工业数据和延迟数据

Apache Flink - 如果在 x 分钟内没有收到数据,则发送事件

Flink 复杂事件处理

Apache Flink CEP如何检测事件是不是在x秒内没有发生?

「Flink」事件时间与水印

无论窗口时间如何,都可以在Apache Flink中组合两个流