flink笔记9 [实验]体验窗口开启时间和关闭时间(Eventtime)
Posted Aurora1217
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了flink笔记9 [实验]体验窗口开启时间和关闭时间(Eventtime)相关的知识,希望对你有一定的参考价值。
体验窗口开启时间和关闭时间
实验数据
sensor_1,1619492107,36.2
sensor_1,1619492108,36.0
sensor_1,1619492109,36.5
sensor_1,1619492110,34.3
sensor_1,1619492111,34.3
sensor_1,1619492112,34.3
sensor_1,1619492113,34.3
sensor_1,1619492114,34.3
sensor_1,1619492115,34.3
sensor_1,1619492116,34.3
sensor_1,1619492117,34.3
sensor_1,1619492118,34.3
sensor_1,1619492119,34.3
sensor_1,1619492120,36.2
sensor_1,1619492121,36.1
sensor_1,1619492122,36.5
sensor_1,1619492123,35.2
sensor_1,1619492124,36.2
sensor_1,1619492125,36.1
sensor_1,1619492126,36.5
sensor_1,1619492127,36.2
sensor_1,1619492128,36.1
sensor_1,1619492129,36.3
sensor_1,1619492130,36.2
sensor_1,1619492131,36.2
sensor_1,1619492132,36.2
sensor_1,1619492170,36.2
sensor_1,1619492171,36.5
sensor_1,1619492172,37.3
sensor_1,1619492173,36.2
sensor_1,1619492174,36.3
sensor_1,1619492175,36.1
sensor_1,1619492176,36.6
sensor_1,1619492177,36.5
sensor_1,1619492178,36.1
sensor_1,1619492179,36.8
sensor_1,1619492180,36.1
sensor_1,1619492209,36.5
实验代码
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}
import org.apache.flink.streaming.api.windowing.time.Time
case class sensorReading(id:String,timestamp:Long,temperature:Double)
object window_eventtime {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val inputStream = env.socketTextStream("localhost",7777)
val resultStream = inputStream
.map(data =>{
val arr = data.split(",")
sensorReading(arr(0),arr(1).toLong,arr(2).toDouble)
})
.assignAscendingTimestamps(_.timestamp * 1000)
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[sensorReading](Time.seconds(3)) {
override def extractTimestamp(element: sensorReading): Long = element.timestamp * 1000L
} )
.keyBy(_.id)
.timeWindow(Time.seconds(15))
.reduce((data1,data2)=>sensorReading(data1.id,data2.timestamp,data1.temperature.min(data2.temperature)))
resultStream.print()
env.execute("window_eventtime test")
}
}
实验结果
实验分析
窗口开始时间 = TimeStamp - (TimeStamp - offset + windowSize) % windowSize
第一个窗口的开始时间:1619492107 - (1619492107 - 0 + 15) % 15 = 1619492100
第一个窗口就是 [1619492100 , 1619492115)
由实验结果(第一个窗口中最新的时间戳为1619492114)可知:正好在结束时间产生的数据1619492115不在第一个窗口里
窗口开始时间公式
可以在底层源码中找到
以上是关于flink笔记9 [实验]体验窗口开启时间和关闭时间(Eventtime)的主要内容,如果未能解决你的问题,请参考以下文章
flink笔记10 [实验]体验ProcessingTime和指定EventTime下的区别
flink笔记10 [实验]体验ProcessingTime和指定EventTime下的区别
flink笔记10 [实验]体验ProcessingTime和指定EventTime下的区别
学习笔记Flink—— Flink数据流模型时间窗口和核心概念