Flink 窗口 window
Posted noyouth
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink 窗口 window相关的知识,希望对你有一定的参考价值。
一、基本概念
1.窗口分类
TimeWindow:按照时间生成 Window。对于 TimeWindow,可以根据窗口实现原理的不同分成三类:滚动窗口(TumblingWindow)、滑动窗口(Sliding Window)和会话窗口(Session Window)。
2.时间分类
Event Time:是事件创建的时间。它通常由事件中的时间戳描述,例如采集的日志数据中,每一条日志都会记录自己的生成时间,Flink 通过时间戳分配器访问事件时间戳。
Ingestion Time:是数据进入 Flink 的时间。
二、案例演示
案例1:按Processing Time划分滚动时间窗口
import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.time.Time object WindowTest { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) val socketStream = env.socketTextStream("hadoop102",7777) val dataStream: DataStream[SensorReading] = socketStream.map(d => { val arr = d.split(",") SensorReading(arr(0).trim, arr(1).trim.toLong, arr(2).toDouble) }) //统计10秒内的最小温度 val minTemperatureStream = dataStream.map(data=>(data.id,data.temperature)) .keyBy(_._1) .timeWindow(Time.seconds(10)) //10秒滚动窗口,不指定时间特性,默认为ProcessingTime .reduce((data1, data2)=>(data1._1,data1._2.min(data2._2))) //打印原始的dataStream dataStream.print("data stream") //打印窗口数据流 minTemperatureStream.print("min temperature") env.execute("window test") } }
测试:
连续输入两条数据
[atguigu@hadoop102 ~]$ nc -lk 7777 sensor_1, 1547718200, 30.8 sensor_1, 1547718201, 40.8
在一个10秒的滚动窗口内,窗口流minTemperatureStream 只输出了一条数据。此时触发TimeWindow去计算的时机就是第一条数据来的10秒过后。
data stream> SensorReading(sensor_1,1547718200,30.8) data stream> SensorReading(sensor_1,1547718201,40.8) min temperature> (sensor_1,30.8)
案例2:带水位的滚动时间窗口
代码分析:
①通过env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)设定窗口的时间特性为事件时间。
②在assignTimestampsAndWatermarks()方法中,传递一个BoundedOutOfOrdernessTimestampExtractor类实现对象,构造器参数就是容忍的延迟时间,实现方法,指明时间戳用哪个字段。
object WindowTest { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val socketStream = env.socketTextStream("hadoop102",7777) val dataStream: DataStream[SensorReading] = socketStream .map(d => { val arr = d.split(",") SensorReading(arr(0).trim, arr(1).trim.toLong, arr(2).toDouble) }) .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(2)) { override def extractTimestamp(t: SensorReading): Long = t.timestamp * 1000 }) //.assignAscendingTimestamps(_.timestamp) //升序数据添指定时间戳 //统计5秒内的最小温度 val minTemperatureStream = dataStream.map(data=>(data.id,data.temperature)) .keyBy(_._1) .timeWindow(Time.seconds(5)) //5秒滚动窗口 .reduce((data1, data2)=>(data1._1,data1._2.min(data2._2))) //打印原始的dataStream dataStream.print("data stream") //打印窗口数据流 minTemperatureStream.print("min temperature") env.execute("window test") } }
测试:
当输入第一条数据时,时间戳是1547718200(单位秒),因为窗口的长度为5,所以理论上当时间戳为1547718205的数据来后,窗口会打印输出,但是由于设定了延迟2秒,所以此时水位才到1547718203,所以只有当时间戳为1547718207或之后的数据到来,水位线涨到大于等于1547718205时,窗口才会触发计算并关闭。
sockt输入数据如下
[atguigu@hadoop102 ~]$ nc -lk 7777 sensor_1, 1547718200, 30.8 sensor_1, 1547718201, 31 sensor_1, 1547718202, 32 sensor_1, 1547718203, 33 sensor_1, 1547718204, 34 sensor_1, 1547718205, 35 sensor_1, 1547718206, 36 sensor_1, 1547718207, 37 sensor_1, 1547718208,38
控制台打印如下:
data stream> SensorReading(sensor_1,1547718200,30.8) data stream> SensorReading(sensor_1,1547718201,31.0) data stream> SensorReading(sensor_1,1547718202,32.0) data stream> SensorReading(sensor_1,1547718203,33.0) data stream> SensorReading(sensor_1,1547718204,34.0) data stream> SensorReading(sensor_1,1547718205,35.0) data stream> SensorReading(sensor_1,1547718206,36.0) data stream> SensorReading(sensor_1,1547718207,37.0) min temperature> (sensor_1,30.8) data stream> SensorReading(sensor_1,1547718208,38.0)
案例3:滑动时间窗口
滑动窗口和滚动窗口特性类似,滚动窗口可以看作一种特殊的滑动窗口,其窗口长度与滑动长度一样。在.timeWindow(Time.seconds(10),Time.seconds(5)) 方法中,设定了窗口的长度为10,滑动长度为5。窗口长度决定了窗口计算的数据的范围有多大,而滑动长度决定了窗口计算并关闭的时机。
//统计10秒内的最小温度,5秒输出一次 val minTemperatureStream = dataStream.map(data=>(data.id,data.temperature)) .keyBy(_._1) .timeWindow(Time.seconds(10),Time.seconds(5)) //滑动窗口 .reduce((data1, data2)=>(data1._1,data1._2.min(data2._2)))
以上是关于Flink 窗口 window的主要内容,如果未能解决你的问题,请参考以下文章
11.Flink四大基石Window窗口的分类Flink提供了很多各种场景用的WindowAssigner基于时间的滚动和滑动基于时间的滚动和滑动窗口基于数量的滚动和滑动
11-flink-1.10.1- Flink window API
11-flink-1.10.1- Flink window API
1.22.FLINK WatermarkFlink窗口(Window)watermark有什么用?如何使用Watermarks处理乱序的数据流?机制及实例详解生成方式代码实例