Flink 操作示例 —— 水印

Posted lemos

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink 操作示例 —— 水印相关的知识,希望对你有一定的参考价值。

内置水印生成器

1.有序生成

只需提取事件时间的时间戳作为水印即可。

java

DataStream<MyEvent> stream = ...

DataStream<MyEvent> withTimestampsAndWatermarks =
    stream.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<MyEvent>() {

        @Override
        public long extractAscendingTimestamp(MyEvent element) {
            return element.getCreationTime();
        }
});

scala

val stream: DataStream[MyEvent] = ...

val withTimestampsAndWatermarks = stream.assignAscendingTimestamps( _.getCreationTime )

 

2.有界无序生成策略

设置延迟的上限。我们知道每个事件都会延迟一段时间才到达,而这些延迟差异会比较大,所以有些事件会比其他事件延迟更多。一种简单的方法是假设这些延迟不会超过某个最大值。

java

DataStream<MyEvent> stream = ...

DataStream<MyEvent> withTimestampsAndWatermarks =
    stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<MyEvent>(Time.seconds(10)) {

        @Override
        public long extractTimestamp(MyEvent element) {
            return element.getCreationTime();
        }
});

scala

val stream: DataStream[MyEvent] = ...

val withTimestampsAndWatermarks = stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[MyEvent](Time.seconds(10))( _.getCreationTime ))

 

自定义水印生成器

1.定期水印

AssignerWithPeriodicWatermarks 分配时间戳并定期生成水印(可能取决于流元素,或纯粹基于处理时间)。
通过 ExecutionConfig.setAutoWatermarkInterval(...)  定义生成水印的间隔(每n毫秒)。 每次都会调用分配者的 getCurrentWatermark() 方法,如果返回的水印非空且大于前一个水印,则将发出一个新的水印。

class PeriodicAssigner extends AssignerWithPeriodicWatermarks[SensorReading] {

  // 1 min in ms
  val bound: Long = 60 * 1000
  // the maximum observed timestamp
  var maxTs: Long = Long.MinValue

  override def getCurrentWatermark: Watermark = {
    new Watermark(maxTs - bound)
  }

  override def extractTimestamp(r: SensorReading, previousTS: Long): Long = {
    // update maximum timestamp
    maxTs = maxTs.max(r.timestamp)
    // return record timestamp
    r.timestamp
  }

 

2.带标记的水印

AssignerWithPunctuatedWatermarks 根据元素的特定标记生成新的水印。 对于此类,Flink将首先调用 extractTimestamp(...) 方法为该元素分配时间戳,然后立即在该元素上调用 checkAndGetNextWatermark(...)方法。
将 checkAndGetNextWatermark(...) 方法传递给 extractTimestamp(...) 方法中分配的时间戳,并可以决定是否要生成水印。 每当 checkAndGetNextWatermark(...) 方法返回非空水印,并且该水印大于最新的先前水印时,就会发出新的水印。

class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks[SensorReading] {

  // 1 min in ms
  val bound: Long = 60 * 1000

  override def checkAndGetNextWatermark(r: SensorReading, extractedTS: Long): Watermark = {
    if (r.id == "sensor_1") {
      // emit watermark if reading is from sensor_1
      new Watermark(extractedTS - bound)
    } else {
      // do not emit a watermark
      null
    }
  }

  override def extractTimestamp(r: SensorReading, previousTS: Long): Long = {
    // assign record timestamp
    r.timestamp
  }
}

 

参考文章


https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/event_timestamp_extractors.html

http://wuchong.me/blog/2018/11/18/flink-tips-watermarks-in-apache-flink-made-easy/

https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/event_timestamps_watermarks.html

以上是关于Flink 操作示例 —— 水印的主要内容,如果未能解决你的问题,请参考以下文章

Java itext为pdf 文件添加水印核心功能代码片段

Flink水印机制(watermark)

Flink事件时间和水印详解

Flink 1.8 Generating Timestamps, Watermarks 生成时间戳, 水印

Flink中window 窗口和时间以及watermark水印

h5网页水印SDK的实现代码示例