Flink 操作示例 —— 水印
Posted lemos
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink 操作示例 —— 水印相关的知识,希望对你有一定的参考价值。
内置水印生成器
1.有序生成
只需提取事件时间的时间戳作为水印即可。
java
DataStream<MyEvent> stream = ... DataStream<MyEvent> withTimestampsAndWatermarks = stream.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<MyEvent>() { @Override public long extractAscendingTimestamp(MyEvent element) { return element.getCreationTime(); } });
scala
val stream: DataStream[MyEvent] = ... val withTimestampsAndWatermarks = stream.assignAscendingTimestamps( _.getCreationTime )
2.有界无序生成策略
设置延迟的上限。我们知道每个事件都会延迟一段时间才到达,而这些延迟差异会比较大,所以有些事件会比其他事件延迟更多。一种简单的方法是假设这些延迟不会超过某个最大值。
java
DataStream<MyEvent> stream = ... DataStream<MyEvent> withTimestampsAndWatermarks = stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<MyEvent>(Time.seconds(10)) { @Override public long extractTimestamp(MyEvent element) { return element.getCreationTime(); } });
scala
val stream: DataStream[MyEvent] = ... val withTimestampsAndWatermarks = stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[MyEvent](Time.seconds(10))( _.getCreationTime ))
自定义水印生成器
1.定期水印
AssignerWithPeriodicWatermarks 分配时间戳并定期生成水印(可能取决于流元素,或纯粹基于处理时间)。
通过 ExecutionConfig.setAutoWatermarkInterval(...) 定义生成水印的间隔(每n毫秒)。 每次都会调用分配者的 getCurrentWatermark() 方法,如果返回的水印非空且大于前一个水印,则将发出一个新的水印。
class PeriodicAssigner extends AssignerWithPeriodicWatermarks[SensorReading] { // 1 min in ms val bound: Long = 60 * 1000 // the maximum observed timestamp var maxTs: Long = Long.MinValue override def getCurrentWatermark: Watermark = { new Watermark(maxTs - bound) } override def extractTimestamp(r: SensorReading, previousTS: Long): Long = { // update maximum timestamp maxTs = maxTs.max(r.timestamp) // return record timestamp r.timestamp }
2.带标记的水印
AssignerWithPunctuatedWatermarks 根据元素的特定标记生成新的水印。 对于此类,Flink将首先调用 extractTimestamp(...) 方法为该元素分配时间戳,然后立即在该元素上调用 checkAndGetNextWatermark(...)方法。
将 checkAndGetNextWatermark(...) 方法传递给 extractTimestamp(...) 方法中分配的时间戳,并可以决定是否要生成水印。 每当 checkAndGetNextWatermark(...) 方法返回非空水印,并且该水印大于最新的先前水印时,就会发出新的水印。
class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks[SensorReading] { // 1 min in ms val bound: Long = 60 * 1000 override def checkAndGetNextWatermark(r: SensorReading, extractedTS: Long): Watermark = { if (r.id == "sensor_1") { // emit watermark if reading is from sensor_1 new Watermark(extractedTS - bound) } else { // do not emit a watermark null } } override def extractTimestamp(r: SensorReading, previousTS: Long): Long = { // assign record timestamp r.timestamp } }
参考文章
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/event_timestamp_extractors.html
http://wuchong.me/blog/2018/11/18/flink-tips-watermarks-in-apache-flink-made-easy/
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/event_timestamps_watermarks.html
以上是关于Flink 操作示例 —— 水印的主要内容,如果未能解决你的问题,请参考以下文章
Flink 1.8 Generating Timestamps, Watermarks 生成时间戳, 水印