Flink的窗口计算案例
Posted ZL小屁孩
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink的窗口计算案例相关的知识,希望对你有一定的参考价值。
窗口的分类
按照时间生成Window,为TimeWindow,根据窗口实现原理可分为三类:
- 滚动窗口(Tumbling Window):将数据依据固定的窗口长度对数据进行分片。滚动窗口分配器将每个元素分配到一个指定窗口大小的窗口中,滚动窗口有一个固定的大小,并且不会重叠。适用于做每个时间段的聚合计算
- 滑动窗口(Sliding Window):由固定的窗口长度和滑动间隔组成,适用于最近一个时间段内统计,窗口长度固定,可有重叠
- 会话窗口(Session Window):由一系列事件组合一个指定时间长度的timeout间隙组成,也就是一段时间没有接收到新的数据就会生成新的窗口。会话窗口是指一段用户持续活跃周期,由非活跃的间隙分隔开,时间不对齐
按照指定的数据条数生成一个Window,与时间无关,为CountWindow:
- 滚动窗口(Tumbling Window):默认的CountWindow是一个滚动窗口,只需要指定窗口大小即可,当元素数量达到窗口大小时,就会触发窗口的执行
- 滑动窗口(Sliding Window):和滚动窗口的函数名完全一致,只是传入的参数不同,滑动窗口需要传入两个参数,一个是window_size,一个是sliding_size
注意事项:所有窗口是左闭右开
代码实际案例
关于TimeWindow的算子的应用
def time_window_func(sEnv: StreamExecutionEnvironment) =
sEnv.setParallelism(1)
val keyByDS: KeyedStream[(String, Int), String] = sEnv.socketTextStream("localhost", 9999)
.flatMap(_.split(" ")).map((_, 1))
.keyBy(_._1)
/**滚动窗口*/
val tumblingDS = keyByDS.timeWindow(Time.seconds(3))
.sum(1)
tumblingDS.print()
/**滑动窗口*/
val slidingDS = keyByDS.timeWindow(Time.seconds(3), Time.seconds(1))
slidingDS.sum(1).print()
/**会话窗口:固定时间间隔为10s的事件时间会话窗口*/
val sessionDS = keyByDS.window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))
sessionDS.sum(1).print()
sEnv.execute()
关于CountWindow的算子应用
def count_window_func(sEnv: StreamExecutionEnvironment) =
val keyByDS = sEnv.socketTextStream("localhost", 9999)
.flatMap(_.split(" ")).map((_, 1))
.keyBy(_._1)
/**滚动函数*/
keyByDS.countWindow(3).reduce((t1, t2) =>
(t1._1, t2._2 + t2._2)
).print()
/**滑动窗口 这里sliding_size为2,表示每收到两个相同的key的数据就计算一次,每一次计算的window范围是3个元素*/
keyByDS.countWindow(3, 2).reduce((t1, t2) =>
(t1._1, t1._2 + t2._2)
).print()
sEnv.execute()
但是在实际生产过程中,会用到很多会话窗口的算子进行处理数据,会话窗口包括固定时间窗口和动态时间窗口的操作,,这里我简述几个简单的关于会话窗口的算子操作
def session_windows(sEnv: StreamExecutionEnvironment) =
sEnv.setParallelism(1)
val keyByDS: KeyedStream[(String, Int), String] = sEnv.socketTextStream("localhost", 9999)
.flatMap(_.split(" ")).map((_, 1))
.keyBy(_._1)
/**固定时间间隔为10s的事件时间会话窗口*/
keyByDS.window(EventTimeSessionWindows.withGap(Time.seconds(10))).sum(1).print()
/**动态时间间隔的事件时间会话窗口*/
keyByDS.window(EventTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[(String, Int)]
override def extract(eles: (String, Int)): Long =
//动态指定并返回Session Gap
eles._2 + 10
))
.allowedLateness(Time.seconds(10)) //迟到生存期,默认是0,即事件时间窗口窗口在水印到来后结束,无需考虑事件迟到的情况
.sum(1).print()
/**固定时间间隔为10s的处理时间会话窗口*/
keyByDS.window(ProcessingTimeSessionWindows.withGap(Time.seconds(10))).sum(1).print()
/**动态时间间隔的处理时间会话窗口*/
keyByDS.window(DynamicProcessingTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[(String, Int)]
override def extract(t: (String, Int)): Long =
//根据事件特征确定会话窗口间隔
t._1.length
)).sum(1).print()
sEnv.execute()
全局窗口案例
/**全窗口函数(full window functions): ProcessWindowFunction 先把窗口所有数据收集起来,等到计算的时候会遍历所有数据*/
/**全局窗口:将相同key的所有元素聚在一个, 但是这种窗口没有起点和终点,因此必须自定义触发器*/
val processDS: DataStream[String] = keyByDS.countWindow(3).process(new ProcessWindowFunction[(String, Int), String, String, GlobalWindow]
override def process(key: String, context: Context, elements: Iterable[(String, Int)], out: Collector[String]): Unit =
out.collect(elements.mkString(","))
)
以上是关于Flink的窗口计算案例的主要内容,如果未能解决你的问题,请参考以下文章