Flink WaterMark原理与实现

Posted zfwwdz

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink WaterMark原理与实现相关的知识,希望对你有一定的参考价值。

一、WaterMark作用

在使用 EventTime 处理 Stream 数据的时候会遇到数据乱序的问题,流处理从 Event(事 件)产生,流经 Source,再到 Operator,这中间需要一定的时间。虽然大部分情况下,传输到 Operator 的数据都是按照事件产生的时间顺序来的,但是也不排除由于网络延迟等原因而导致乱序的产生,特别是使用 Kafka 的时候,多个分区之间的数据无法保证有序。因此, 在进行 Window 计算的时候,不能无限期地等下去,必须要有个机制来保证在特定的时间后, 必须触发 Window 进行计算,这个特别的机制就是 Watermark(水位线)。Watermark 是用于 处理乱序事件的。

 

二、原理

在 Flink 的窗口处理过程中,如果确定全部数据到达,就可以对 Window 的所有数据做 窗口计算操作(如汇总、分组等),如果数据没有全部到达,则继续等待该窗口中的数据全 部到达才开始处理。这种情况下就需要用到水位线(WaterMarks)机制,它能够衡量数据处 理进度(表达数据到达的完整性),保证事件数据(全部)到达 Flink 系统,或者在乱序及 延迟到达时,也能够像预期一样计算出正确并且连续的结果。当任何 Event 进入到 Flink 系统时,会根据当前最大事件时间产生 Watermarks 时间戳

如何计算WaterMark的值?

Watermark = 进入 Flink 的最大的事件时间(mxtEventTime)— 指定的延迟时间(t)

有Watermark 的 Window 是怎么触发窗口函数?

如果有窗口的停止时间等于或者小于maxEventTime – t(当时的 warkmark),那么 这个窗口被触发执行

 

Watermark 的使用存在三种情况:

1. 本来有序的 Stream 中的 Watermark

如果数据元素的事件时间是有序的,Watermark 时间戳会随着数据元素的事件时间按顺 序生成,此时水位线的变化和事件时间保持一直(因为既然是有序的时间,就不需要设置延 迟了,那么 t 就是 0。所以 watermark=maxtime-0 = maxtime),也就是理想状态下的水位 线。当 Watermark 时间大于 Windows 结束时间就会触发对 Windows 的数据计算,以此类推,下一个 Window 也是一样。

技术图片

 

2.乱序事件中的 Watermark

现实情况下数据元素往往并不是按照其产生顺序接入到 Flink 系统中进行处理,而频繁出现乱序或迟到的情况,这种情况就需要使用 Watermarks 来应对。比如下图,设置延迟时 间t为2

技术图片

 

 3.并行数据流中的 Watermark

 在多并行度的情况下,Watermark 会有一个对齐机制,这个对齐机制会取所有 Channel中最小的 Watermark。

技术图片

 

 

三、Watermark 运用

1.有序的Watermark

object WaterMark1 {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    // 使用eventTime
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val stream= env.socketTextStream("flink101", 8888)
      .map(line => {
        var arr = line.split(",")
        Log(arr(0).trim,arr(1).trim, arr(2).trim, arr(3).trim, arr(4).trim.toLong, arr(5).trim.toLong)
      }).assignAscendingTimestamps(_.callTime) // 数据有序的升序watermark
      .filter(_.callType.equals("success"))
        .keyBy(_.sid)
        .timeWindow(Time.seconds(10), Time.seconds(5))
        .reduce(new MyReduceFunction(), new ReturnMaxTimeWindowFunction)

    env.execute("assignAscendingTimestampsDemo")
  }

2.无序的Watermark

object Watermark2 {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    // 使用eventTime
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    // 周期引入watermart的设置,默认是100毫秒
    env.getConfig.setAutoWatermarkInterval(100L)

    val stream= env.socketTextStream("flink101", 8888)
      .map(line => {
        var arr = line.split(",")
        Log(arr(0).trim,arr(1).trim, arr(2).trim, arr(3).trim, arr(4).trim.toLong, arr(5).trim.toLong)
      })
    // 数据是乱序的,延迟时间为3秒,周期性watermark

    /**
      * 第一种实现
      */
    val ds = stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[Log](Time.seconds(3)){
      override def extractTimestamp(element: Log): Long = {
        element.callTime  // EventTime
      }
    })

    /**
      * 第二种实现
      */
    val ds2 = stream.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[Log] {

      var maxEventTime: Long =_

      override def getCurrentWatermark: Watermark = {
        // 周期性生成watermark
        new Watermark(maxEventTime - 3000L)
      }

      // 设定EventTime是哪个属性
      override def extractTimestamp(element: Log, previousElementTimestamp: Long): Long = {
        maxEventTime = maxEventTime.max(element.callTime)
        element.callTime
      }
    })

    env.execute("assignTimestampsAndWatermarksDemo")
  }

 

3.With Punctuated(间断性的) Watermark

val env = StreamExecutionEnvironment.getExecutionEnvironment
    // 使用eventTime
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

//读取文件数据
val data = env.socketTextStream("flink101",8888)
.map(line=>{
var arr =line.split(",") new
StationLog(arr(0).trim,arr(1).trim,arr(2).trim,arr(3).trim,arr(4).trim.toLong,arr(5).trim.to Long)
})

// 生成watermark
data.assignTimestampsAndWatermarks(
new MyCustomerPunctuatedWatermarks(3000L)) //自定义延迟
}
class MyCustomerPunctuatedWatermarks(delay:Long) extends AssignerWithPunctuatedWatermarks[StationLog]{
var maxTime :Long=0
override def checkAndGetNextWatermark(element: StationLog, extractedTimestamp: Long): Watermark = {
if(element.sid.equals("station_1")){//当ID为:station_1 才生成水位线 maxTime =maxTime.max(extractedTimestamp)
new Watermark(maxTime-delay)
}else{
return null //其他情况下不返回水位线
} }
override def extractTimestamp(element: StationLog, previousElementTimestamp: Long): Long = {
element.callTime //抽取EventTime的值 }
}

 

以上三种Watermark的实现,根据数据的事件时间是否有延迟和业务需求选择相应的生成WaterMark的方法。

 

 

以上是关于Flink WaterMark原理与实现的主要内容,如果未能解决你的问题,请参考以下文章

12-flink-1.10.1-Flink中的时间语义和watermark

flink窗口与水位线watermark例子

Flink 1.11+ 版本如何生成 Watermark

Flink 1.11+ 版本如何生成 Watermark

Flink - watermark

Flink——Event Time 与 Watermark