flink学习day04:对Event Time 与 Watermark的理解

Posted 黑马程序员官方

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了flink学习day04:对Event Time 与 Watermark的理解相关的知识,希望对你有一定的参考价值。

☑ 带大家零基础入门flink:

flink学习day01:Flink基础入门(含案例)
flink学习day02::datasource、transforma和sink
flink学习day03:flink datastream 开发


流式计算中时间的分类:

1 eventTime:数据、事件产生的时间,

2 ingestionTime:进入flink/spark的时间

3 processingTime:进入到具体计算的operator的系统时间

分析:

spark streaming中的窗口计算使用的就是processingtime,与事件、数据真实发生的时间无关,就取决于什么到达处理节点;

flink中引入了eventtime机制,就是flink中可以指定窗口计算的时候按照事件时间(事件真实发生的时间)来进行计算。使用eventtime进行计算才是正确,符合数据发生的时间。

需要在env进行设置,同时你需要保证数据(事件)中要有eventtime时间字段:

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //设置使用事件时间

watermark

按照eventtime进行是不是就高枕无忧了?

数据有可能会延迟产生 使用eventtime进行计算才是合理的;

问题 二

数据乱序到达的问题,晚(eventime)的数据先到达,早(eventime)的数据后达到

flink watermark原理参考画图

flink watermark案例

api介绍

我们一般选择使用周期性方式生成水印

案例

需求:添加水印统计信号灯通过的汽车数量。

参考代码:

package cn.itcast.flink.watermark


import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.scala.DataStream, StreamExecutionEnvironment, WindowedStream
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

/*
演示使用周期性方式生成水印
 */
/*
需求:
编写代码, 计算5秒内(滚动时间窗口),每个信号灯汽车数量
信号灯数据(信号ID(String)、通过汽车数量、时间戳(事件时间)),要求添加水印来解决网络延迟问题。
 */
//3. 定义CarWc 样例类
case class CarWc(id: String, num: Int, ts: Long)

object WatermarkDemo 
  /*
  1. 创建流处理运行环境
2. 设置处理时间为EventTime ,设置水印的周期间隔,定期生成水印的时间
3. 定义CarWc 样例类
4. 使用socketstream发送数据
5. 添加水印
   - 允许延迟2秒
   - 在获取水印方法中,打印水印时间、事件时间和当前系统时间
6. 按照用户进行分流
7. 设置5秒的时间窗口
8. 进行聚合计算
9. 打印结果数据
10. 启动执行流处理
   */
  def main(args: Array[String]): Unit = 
    //1 创建流处理运行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //2 设置处理时间为事件时间,
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    //3 生成水印的周期 默认200ms
    env.getConfig.setAutoWatermarkInterval(200)

    // 默认程序并行度是机器的核数,8个并行度,注意在flink程序中如果是多并行度,水印时间是每个并行度比较最小的值作为当前流的watermark
    env.setParallelism(1)

    //4 添加socketsource
    val socketDs: DataStream[String] = env.socketTextStream("node1", 9999)
    // 5 数据处理之后添加水印
    val carWcDs: DataStream[CarWc] = socketDs.map(
      line => 
        //按照逗号切分数据组成carwc
        val arr = line.split(",")
        CarWc(arr(0), arr(1).trim.toInt, arr(2).trim.toLong)
      
    )
    // 添加水印 周期性  AssignerWithPeriodicWatermarks 使用其子类 ,构造参数:水印允许的延迟时间,泛型是stream中的数据类型
    val watermarkDs: DataStream[CarWc] = carWcDs.assignTimestampsAndWatermarks(
      new BoundedOutOfOrdernessTimestampExtractor[CarWc](Time.seconds(2)) 
        // 水印机制是在eventtime基础之上减去一段时间,就是flink允许数据延迟的范围;eventtime是来自数据,flink是不知道eventtime是多少,以及是哪个字段
        //这个方法就是告诉flink你的数据哪个字段是eventime
        override def extractTimestamp(element: CarWc): Long = 
          element.ts
        
      )
    // 6 设置窗口 5s的滚动窗口
    val windowStream: WindowedStream[CarWc, Tuple, TimeWindow] = watermarkDs.keyBy(0).
      window(TumblingEventTimeWindows.of(Time.seconds(5)))
    // 7 使用apply方法对窗口进行计算
    val windowDs: DataStream[CarWc] = windowStream.apply(
      //泛型:1 carwc,2 carwc,3 tuple,4 timewindow
      new WindowFunction[CarWc, CarWc, Tuple, TimeWindow] 
        //key:tuple,window:当前触发计算的window对象,input:当前窗口的数据,out:计算结果收集器
        override def apply(key: Tuple, window: TimeWindow, input: Iterable[CarWc], out: Collector[CarWc]): Unit = 

          val wc: CarWc = input.reduce(
            (c1, c2) => 
              CarWc(c1.id, c1.num + c2.num, c2.ts) //累加出通过的汽车数量,关于时间在这里我们不关心
            
          )
          //发送计算结果
          out.collect(wc)
          //获取到窗口开始和结束时间
          println("窗口开始时间》》" + window.getStart + "=====;窗口结束时间》》" + window.getEnd + ";窗口中的数据》》" +
            input.iterator.mkString(","))
        
      
    )
    // 打印结果
    windowDs.print()
    // 启动
    env.execute()


  


总结:

注意:

(1)基于事件时间进行计算的时候,判断数据是属于哪个窗口

判断标准:

eventtime >= window -starttime , enenttime <window-endtime

窗口是左闭右开的

[)

(2)窗口开始时间和结束时间的确定:源码解读

窗口开始时间取决于第一条数据的eventtime的值!!

(3)添加水印方式选择的是周期性添加AssignerWithPeriodicWatermarks,传入的是的AssignerWithPeriodicWatermarks子类,BoundedOutOfOrdernessTimestampExtractor。

复杂版本–手动实现

package cn.itcast.flink.watermark

import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.scala.DataStream, StreamExecutionEnvironment, WindowedStream
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

/*
演示使用周期性方式生成水印 -复杂版本--手动实现watermark机制
 */
/*
需求:
编写代码, 计算5秒内(滚动时间窗口),每个信号灯汽车数量
信号灯数据(信号ID(String)、通过汽车数量、时间戳(事件时间)),要求添加水印来解决网络延迟问题。
 */
//3. 定义CarWc 样例类
case class CarWc(id: String, num: Int, ts: Long)

object WatermarkDemo2 
  /*
  1. 创建流处理运行环境
2. 设置处理时间为EventTime ,设置水印的周期间隔,定期生成水印的时间
3. 定义CarWc 样例类
4. 使用socketstream发送数据
5. 添加水印
   - 允许延迟2秒
   - 在获取水印方法中,打印水印时间、事件时间和当前系统时间
6. 按照用户进行分流
7. 设置5秒的时间窗口
8. 进行聚合计算
9. 打印结果数据
10. 启动执行流处理
   */
  def main(args: Array[String]): Unit = 
    //1 创建流处理运行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //2 设置处理时间为事件时间,
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    //3 生成水印的周期 默认200ms
    env.getConfig.setAutoWatermarkInterval(200)

    // 默认程序并行度是机器的核数,8个并行度,注意在flink程序中如果是多并行度,水印时间是每个并行度比较最小的值作为当前流的watermark
    env.setParallelism(1)

    //4 添加socketsource
    val socketDs: DataStream[String] = env.socketTextStream("node1", 9999)
    // 5 数据处理之后添加水印
    val carWcDs: DataStream[CarWc] = socketDs.map(
      line => 
        //按照逗号切分数据组成carwc
        val arr = line.split(",")
        CarWc(arr(0), arr(1).trim.toInt, arr(2).trim.toLong)
      
    )
    // 5.2 添加水印 周期性  new AssignerWithPeriodicWatermarks
    val watermarkDs: DataStream[CarWc] = carWcDs.assignTimestampsAndWatermarks(
      new AssignerWithPeriodicWatermarks[CarWc] 
        // watermark=eventtime -延迟时间
        // 5.2.1 定义允许延迟的时间 2s
        val delayTime=2000

        //定义当前最大的时间戳
        var currentMaxTimestamp=0L
        /** The timestamp of the last emitted watermark. */
         var lastEmittedWatermark = Long.MinValue
        // todo 获取watermark时间  实现watermark不会倒退
        override def getCurrentWatermark: Watermark = 
          // 计算watermark
          val watermarkTime: Long = currentMaxTimestamp - delayTime
          if (watermarkTime >lastEmittedWatermark)
            lastEmittedWatermark =watermarkTime
          
          new Watermark(lastEmittedWatermark)
        
        //todo 抽取时间戳 element:新到达的元素,previousElementTimestamp:之前元素的时间戳
        // 5.2.2 抽取时间戳 计算watermark
        override def extractTimestamp(element: CarWc, previousElementTimestamp: Long): Long = 
          //获取到时间
          //注意的问题:时间倒退的问题:消息过来是乱序的,每次新来的消息时间戳不是一定变大的,所以会导致水印有可能倒退
          var eventTime = element.ts
          if (eventTime >currentMaxTimestamp)  //比较与之前最大的时间戳进行比较
            currentMaxTimestamp =eventTime
          
          eventTime
        
      
      )
    // 6 设置窗口 5s的滚动窗口
    val windowStream: WindowedStream[CarWc, Tuple, TimeWindow] = watermarkDs.keyBy(0).
      window(TumblingEventTimeWindows.of(Time.seconds(5)))
    // 7 使用apply方法对窗口进行计算
    val windowDs: DataStream[CarWc] = windowStream.apply(
      //泛型:1 carwc,2 carwc,3 tuple,4 timewindow
      new WindowFunction[CarWc, CarWc, Tuple, TimeWindow] 
        //key:tuple,window:当前触发计算的window对象,input:当前窗口的数据,out:计算结果收集器
        override def apply(key: Tuple, window: TimeWindow, input: Iterable[CarWc], out: Collector[CarWc]): Unit = 

          val wc: CarWc = input.reduce(
            (c1, c2) => 
              CarWc(c1.id, c1.num + c2.num, c2.ts) //累加出通过的汽车数量,关于时间在这里我们不关心
            
          )
          //发送计算结果
          out.collect(wc)
          //获取到窗口开始和结束时间
          println("窗口开始时间》》" + window.getStart + "=====;窗口结束时间》》" + window.getEnd + ";窗口中的数据》》" +
            input.iterator.mkString(","))
        
      
    )
    // 打印结果
    windowDs.print()
    // 启动
    env.execute()


  


注意:

建议工作中就使用子类方式实现水印即可!

再次理解watermark

基于watermark+eventime只能解决部分数据延迟问题,不能完全解决,对于watermark无法解决的延迟数据,flink默认是丢弃的,如果我们需要保证数据完全不丢失可以再使用allowedlateness+侧道输出来保证。

allowedlateness+侧道输出 API

(1)allowedLateness(lateness: Time) 这种方式设置的允许延迟时间与水印的延迟时间是一个累加的效果,

但是注意这个时间并不会影响窗口触发计算的标准,watermark >=window-endtime就会触发计算,

只是如果这设置了这个时间,窗口不会关闭和销毁而是继续等待我们这种方式设置的时间。

(2)侧道输出 保存极端延迟数据

sideOutputLateData(outputTag: OutputTag[T]):设置侧道输出保存延迟的数据

DataStream.getSideOutput(tag: OutputTag[X]) :获取其中保存的延迟数据

侧道输出+allowedlateness方案

设置侧道输出与allowedlateness

获取侧道输出数据

参考代码:

package cn.itcast.flink.watermark

import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.scala.DataStream, OutputTag, StreamExecutionEnvironment, WindowedStream
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

/*
演示使用周期性方式生成水印 -复杂版本--手动实现watermark机制,使用侧道输出+allowedlaten保证数据不丢失
 */
/*
需求:
编写代码, 计算5秒内(滚动时间窗口),每个信号灯汽车数量
信号灯数据(信号ID(String)、通过汽车数量、时间戳(事件时间)),要求添加水印来解决网络延迟问题。
 */
//3. 定义CarWc 样例类
case class CarWc(id: String, num: Int, ts: Long)

object SideOutputWKDemo 
  /*
  1. 创建流处理运行环境
2. 设置处理时间为EventTime ,设置水印的周期间隔,定期生成水印的时间
3. 定义CarWc 样例类
4. 使用socketstream发送数据
5. 添加水印
   - 允许延迟2秒
   - 在获取水印方法中,打印水印时间、事件时间和当前系统时间
6. 按照用户进行分流
7. 设置5秒的时间窗口
8. 进行聚合计算
9. 打印结果数据
10. 启动执行流处理
   */
  def main(args: Array[String]): Unit = 
    //1 创建流处理运行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //2 设置处理时间为事件时间,
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    //3 生成水印的周期 默认200ms
    env.getConfig.setAutoWatermarkInterval(200)

    // 默认程序并行度是机器的核数,8个并行度,注意在flink程序中如果是多并行度,水印时间是每个并行度比较最小的值作为当前流的watermark
    env.setParallelism(1)

    //4 添加socketsource
    val socketDs: DataStream[String] = env.socketTextStream("node1", 9999)
    // 5 数据处理之后添加水印
    val carWcDs: DataStream[CarWc] = socketDs.map(
      line => 
        //按照逗号切分数据组成carwc
        val arr = line.split(",")
        CarWc(arr(0), arr(1).trim.toInt, arr(2).trim.toLong)
      
    )
    // 5.2 添加水印 周期性  new AssignerWithPeriodicWatermarks
    val watermarkDs: DataStream[CarWc] = carWcDs.assignTimestampsAndWatermarks(
      new AssignerWithPeriodicWatermarks[CarWc] 
        // watermark=eventtime -延迟时间
        // 5.2.1 定义允许延迟的时间 2s
        val delayTime=2000

        //定义当前最大的时间戳
        var currentMaxTimestamp=0L
        /** The timestamp of the last emitted watermark. */
         var lastEmittedWatermark = Long.MinValue
        // todo 获取watermark时间  实现watermark不会倒退
        override def getCurrentWatermark: Watermark = 
          // 计算watermark
          val watermarkTime: Long = currentMaxTimestamp - delayTime
          if (watermarkTime >lastEmittedWatermark)
            lastEmittedWatermark =watermarkTime
          
          new Watermark(lastEmittedWatermark)
        
        //todo 抽取时间戳 element:新到达的元素,previousElementTimestamp:之前元素的时间戳
        // 5.2.2 抽取时间戳 计算watermark
        override def extractTimestamp(element: CarWc, previousElementTimestamp: Long): Long = 
          //获取到时间
          //注意的问题:时间倒退的问题:消息过来是乱序的,每次新来的消息时间戳不是一定变大的,所以会导致水印有可能倒退
          var eventTime = element.ts
          if (eventTime >currentMaxTimestamp)  //比较与之前最大的时间戳进行比较
            currentMaxTimestamp =eventTime
          
          eventTime
        
      
      )
    // 6 设置窗口 5s的滚动窗口
    //准备一个侧道输出对象
    val outputTag: OutputTag[CarWc] = new OutputTag[CarWc]("lateCarwc")
    val windowStream: WindowedStream[CarWc, Tuple, TimeWindow] = watermarkDs.keyBy(0).
      window(TumblingEventTimeWindows.of(Time.seconds(5)))
    //设置允许延迟时间  --》在水印基础上再次增加延迟允许延迟时间
      .allowedLateness(Time.seconds(5))
    //设置侧道输出
      .sideOutputLateData(outputTag)
    // 7 使用apply方法对窗口进行计算
    val windowDs: DataStream[CarWc] = windowStream.apply(
      //泛型:1 carwc,2 carwc,3 tuple,4 timewindow
      new WindowFunction[CarWc, CarWc, Tuple, TimeWindow] 
        //key:tuple,window:当前触发计算的window对象,input:当前窗口的数据,out:计算结果收集器
        override def apply(key: Tuple, window: TimeWindow, input: Iterable[CarWc], out: Collector[CarWc]): Unit = 

          val wc: CarWc = input.reduce(
            (c1, c2) => 
              CarWc(c1.id, c1.num + c2.num, c2.ts) //累加出通过的汽车数量,关于时间在这里我们不关心
            
          )
          //发送计算结果
          out.collect(wc)
          //获取到窗口开始和结束时间
          println("窗口开始时间》》" + window.getStart + "=====;窗口结束时间》》" + window.getEnd + ";窗口中的数据》》" +
            input.iterator.mkString(","))
        
      
    )
    // 打印结果
    windowDs.print()
    //获取侧道输出的数据
    val lateCarWc: DataStream[CarWc] = windowDs.getSideOutput(outputTag)
    lateCarWc.printToErr("侧道输出数据》》")
    // 启动
    env.execute()


  


flink state

state是什么?

state就是task/operator计算的中间结果,

无状态计算:相同的输入得到相同的结果,只根据输入的数据无需借助其他数据就可以计算出我想要的结果,

有状态计算:相同的输入得到的可能是不同的结果,计算过程会需要中间结果或者历史结果进行联合处理。

flink的wordcount 可以轻松实现累加效果,就是因为使用了state(keyed state)

flink中state的分类

从是否被flink框架管理:

1 manage state:flink框架帮我们管理,自动内存,序列化等,支持数据结构:value,list,map等 ,manage state 推荐使用

2 raw state:需要自己管理,序列化;byte[], 除非自定义operator时可以考虑使用该种state

从是否与key相关分类:

manage state :

​ keyed state:都是基于keyedstream,只有keyedstream可以使用,与key绑定;

​ operator state:是非keystream时候使用,与operator绑定

raw state:operator state

keyedstate

访问:getruntimecontext访问,要求operator是RichFunction;

数据结构:

valuestate:单一值, 是与key对应,我们无需关注key与state之间的映射关系,kv映射关系flink维护;

liststate:是一个列表结构, add:添加值;,遍历其中数据,get获取值

mapstate:状态值就是一个map结构,put,get,putall等

aggregationstate与reducingstate:存储的都是单值,但是需要提供一个reducefunciton aggregate策略,add添加数据之后其实得到的是之前定义function的计算结果。

operator state

访问:无需上下文

数据结构:

liststate,如果不能满足需求可以考虑自己定义一个operator state;

常见keyedstate 的api

value state案例

总结:使用valuestate可以帮助我们存储keyedstream中key对象的value数据,而且无需关心kv之间的映射。

状态使用:

1 定义一个描述器

2 获取一个状态

代码:

package cn.itcast.flink.state.keyedstate

import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.common.state.ValueState, ValueStateDescriptor
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.scala.DataStream, KeyedStream, StreamExecutionEnvironment
import org.apache.flink.api.scala._
import org.apache.flink.configuration.Configuration

/*
 使用ValueState保存中间结果对下面数据求出最大值
 */

object ValueStateDemo 
  def main(args: Array[String]): Unit = 
    /*
     1.获取流处理执行环境
      2.加载数据源 socke数据:k,v
      3.数据分组
      4.数据转换,定义ValueState,保存中间结果
      5.数据打印
      6.触发执行
     */
    //1 创建流处理运行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    // 默认程序并行度是机器的核数
    env.setParallelism(1)
    // 2 使用socket source
    val socketDs: DataStream[String] = env.socketTextStream("node1", 9999)
    // 3 keyed stream 使用keyby进行分组
    val tupleDs: DataStream[(String, Int)] = socketDs.map(line => 
      val arr: Array[String] = line.split(",")
      //转为tuple类型
      (arr(0), arr(1).trim.toInt)
    )
    // 3.1 分组
    val keyStream: KeyedStream[(String, Int), Tuple] = tupleDs.keyBy(0)
    //    keyStream.maxBy(1)
    // 使用valuestate来存储两两比较之后的最大值,新数据到来之后如果比原来的最大值还大则把该值更新为状态值,保证状态中一直存储的是最大值
    //需要通过上下文件来获取keyedstate valuestate
    val maxDs: DataStream[(String, Int)] = keyStream.map(
      // 3.2 使用richfunction操作,需要通过上下文
      new RichMapFunction[(String, Int), (String, Int)] 
        // 3.2.1 声明一个valuestate (不是创建) ,value state无需关心key是谁以及kv之间的映射,flink维护
        var maxValueState: ValueState[Int] = _
        // 3.3 通过上下文才能获取真正的state,上下文件这种操作在执行一次的方法中使用并获取真正的状态对象
        override def open(parameters: Configuration): Unit = 
          // 3.3.1 定义一个state描述器  参数:state的名称,数据类型的字节码文件
          val maxValueDesc: ValueStateDescriptor[Int] = new ValueStateDescriptor[Int]("maxValue", classOf[Int])
          // 3.3.2 根据上下文基于描述器获取state
          maxValueState = getRuntimeContext.getState<

以上是关于flink学习day04:对Event Time 与 Watermark的理解的主要内容,如果未能解决你的问题,请参考以下文章

Flink开启流处理技术新潮流:解决流处理event time和消息乱序

day04_Flink高级API

flink学习day03:flink datastream 开发

Flink 学习 — Flink 中几种 Time 详解

《从0到1学习Flink》—— Flink 中几种 Time 详解

Flink学习入门教程之Event-driven Applications