Flink的窗口计算案例

Posted ZL小屁孩

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink的窗口计算案例相关的知识,希望对你有一定的参考价值。

窗口的分类

按照时间生成Window,为TimeWindow,根据窗口实现原理可分为三类:

  1. 滚动窗口(Tumbling Window):将数据依据固定的窗口长度对数据进行分片。滚动窗口分配器将每个元素分配到一个指定窗口大小的窗口中,滚动窗口有一个固定的大小,并且不会重叠。适用于做每个时间段的聚合计算
  2. 滑动窗口(Sliding Window):由固定的窗口长度和滑动间隔组成,适用于最近一个时间段内统计,窗口长度固定,可有重叠
  3. 会话窗口(Session Window):由一系列事件组合一个指定时间长度的timeout间隙组成,也就是一段时间没有接收到新的数据就会生成新的窗口。会话窗口是指一段用户持续活跃周期,由非活跃的间隙分隔开,时间不对齐

按照指定的数据条数生成一个Window,与时间无关,为CountWindow:

  1. 滚动窗口(Tumbling Window):默认的CountWindow是一个滚动窗口,只需要指定窗口大小即可,当元素数量达到窗口大小时,就会触发窗口的执行
  2. 滑动窗口(Sliding Window):和滚动窗口的函数名完全一致,只是传入的参数不同,滑动窗口需要传入两个参数,一个是window_size,一个是sliding_size

注意事项:所有窗口是左闭右开

代码实际案例

关于TimeWindow的算子的应用

def time_window_func(sEnv: StreamExecutionEnvironment) = 
    sEnv.setParallelism(1)
    val keyByDS: KeyedStream[(String, Int), String] = sEnv.socketTextStream("localhost", 9999)
      .flatMap(_.split(" ")).map((_, 1))
      .keyBy(_._1)

      /**滚动窗口*/
      val tumblingDS = keyByDS.timeWindow(Time.seconds(3))
      .sum(1)
    tumblingDS.print()

    /**滑动窗口*/
    val slidingDS = keyByDS.timeWindow(Time.seconds(3), Time.seconds(1))
    slidingDS.sum(1).print()

    /**会话窗口:固定时间间隔为10s的事件时间会话窗口*/
    val sessionDS = keyByDS.window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))
    sessionDS.sum(1).print()

    sEnv.execute()
  

关于CountWindow的算子应用

  def count_window_func(sEnv: StreamExecutionEnvironment) = 
    val keyByDS = sEnv.socketTextStream("localhost", 9999)
      .flatMap(_.split(" ")).map((_, 1))
      .keyBy(_._1)

    /**滚动函数*/
    keyByDS.countWindow(3).reduce((t1, t2) =>
      (t1._1, t2._2 + t2._2)
    ).print()

    /**滑动窗口 这里sliding_size为2,表示每收到两个相同的key的数据就计算一次,每一次计算的window范围是3个元素*/
    keyByDS.countWindow(3, 2).reduce((t1, t2) =>
      (t1._1, t1._2 + t2._2)
    ).print()

    sEnv.execute()
  

但是在实际生产过程中,会用到很多会话窗口的算子进行处理数据,会话窗口包括固定时间窗口和动态时间窗口的操作,,这里我简述几个简单的关于会话窗口的算子操作

def session_windows(sEnv: StreamExecutionEnvironment) = 
    sEnv.setParallelism(1)
    val keyByDS: KeyedStream[(String, Int), String] = sEnv.socketTextStream("localhost", 9999)
      .flatMap(_.split(" ")).map((_, 1))
      .keyBy(_._1)

    /**固定时间间隔为10s的事件时间会话窗口*/
    keyByDS.window(EventTimeSessionWindows.withGap(Time.seconds(10))).sum(1).print()

    /**动态时间间隔的事件时间会话窗口*/
    keyByDS.window(EventTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[(String, Int)] 
      override def extract(eles: (String, Int)): Long = 
        //动态指定并返回Session Gap
        eles._2 + 10
      
    ))
      .allowedLateness(Time.seconds(10))  //迟到生存期,默认是0,即事件时间窗口窗口在水印到来后结束,无需考虑事件迟到的情况
      .sum(1).print()


    /**固定时间间隔为10s的处理时间会话窗口*/
    keyByDS.window(ProcessingTimeSessionWindows.withGap(Time.seconds(10))).sum(1).print()

    /**动态时间间隔的处理时间会话窗口*/
    keyByDS.window(DynamicProcessingTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[(String, Int)] 
      override def extract(t: (String, Int)): Long = 
        //根据事件特征确定会话窗口间隔
        t._1.length
      
    )).sum(1).print()

    sEnv.execute()
  

全局窗口案例

    /**全窗口函数(full window functions): ProcessWindowFunction 先把窗口所有数据收集起来,等到计算的时候会遍历所有数据*/
    /**全局窗口:将相同key的所有元素聚在一个, 但是这种窗口没有起点和终点,因此必须自定义触发器*/
    val processDS: DataStream[String] = keyByDS.countWindow(3).process(new ProcessWindowFunction[(String, Int), String, String, GlobalWindow] 
      override def process(key: String, context: Context, elements: Iterable[(String, Int)], out: Collector[String]): Unit = 
        out.collect(elements.mkString(","))
      
    )

以上是关于Flink的窗口计算案例的主要内容,如果未能解决你的问题,请参考以下文章

Flink的窗口计算案例

Flink之Watermark滑动窗口案例

Flink实战系列Flink SQL 之 Session Window 的用法

从0到1Flink的成长之路(二十)-案例:时间会话窗口

从0到1Flink的成长之路(十九)-案例:计数窗口

Flink窗口聚合案例(增量聚合全量聚合)