flink笔记9 [实验]体验窗口开启时间和关闭时间(Eventtime)

Posted Aurora1217

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了flink笔记9 [实验]体验窗口开启时间和关闭时间(Eventtime)相关的知识,希望对你有一定的参考价值。

体验窗口开启时间和关闭时间

 

实验数据

实验代码

实验结果

实验分析

窗口开始时间公式


实验数据

sensor_1,1619492107,36.2
sensor_1,1619492108,36.0
sensor_1,1619492109,36.5
sensor_1,1619492110,34.3
sensor_1,1619492111,34.3
sensor_1,1619492112,34.3
sensor_1,1619492113,34.3
sensor_1,1619492114,34.3
sensor_1,1619492115,34.3
sensor_1,1619492116,34.3
sensor_1,1619492117,34.3
sensor_1,1619492118,34.3
sensor_1,1619492119,34.3
sensor_1,1619492120,36.2
sensor_1,1619492121,36.1
sensor_1,1619492122,36.5
sensor_1,1619492123,35.2
sensor_1,1619492124,36.2
sensor_1,1619492125,36.1
sensor_1,1619492126,36.5
sensor_1,1619492127,36.2
sensor_1,1619492128,36.1
sensor_1,1619492129,36.3
sensor_1,1619492130,36.2
sensor_1,1619492131,36.2
sensor_1,1619492132,36.2
sensor_1,1619492170,36.2
sensor_1,1619492171,36.5
sensor_1,1619492172,37.3
sensor_1,1619492173,36.2
sensor_1,1619492174,36.3
sensor_1,1619492175,36.1
sensor_1,1619492176,36.6
sensor_1,1619492177,36.5
sensor_1,1619492178,36.1
sensor_1,1619492179,36.8
sensor_1,1619492180,36.1
sensor_1,1619492209,36.5

实验代码

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}
import org.apache.flink.streaming.api.windowing.time.Time

case class sensorReading(id:String,timestamp:Long,temperature:Double)

object window_eventtime {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val inputStream = env.socketTextStream("localhost",7777)

    val resultStream = inputStream
      .map(data =>{
        val arr = data.split(",")
        sensorReading(arr(0),arr(1).toLong,arr(2).toDouble)
      })
      .assignAscendingTimestamps(_.timestamp * 1000)
      .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[sensorReading](Time.seconds(3)) {
        override def extractTimestamp(element: sensorReading): Long = element.timestamp * 1000L
      } )
      .keyBy(_.id)
      .timeWindow(Time.seconds(15))
      .reduce((data1,data2)=>sensorReading(data1.id,data2.timestamp,data1.temperature.min(data2.temperature)))

    resultStream.print()

    env.execute("window_eventtime test")
  }
}

实验结果

实验分析

窗口开始时间 = TimeStamp - (TimeStamp - offset + windowSize) % windowSize

第一个窗口的开始时间:1619492107 - (1619492107 - 0 + 15) % 15 = 1619492100
第一个窗口就是 [1619492100 , 1619492115)
由实验结果(第一个窗口中最新的时间戳为1619492114)可知:正好在结束时间产生的数据1619492115不在第一个窗口里

窗口开始时间公式

可以在底层源码中找到

 

以上是关于flink笔记9 [实验]体验窗口开启时间和关闭时间(Eventtime)的主要内容,如果未能解决你的问题,请参考以下文章

flink笔记10 [实验]体验ProcessingTime和指定EventTime下的区别

flink笔记10 [实验]体验ProcessingTime和指定EventTime下的区别

flink笔记10 [实验]体验ProcessingTime和指定EventTime下的区别

学习笔记Flink—— Flink数据流模型时间窗口和核心概念

学习笔记Flink—— Flink数据流模型时间窗口和核心概念

flink笔记15 flink table表的时间属性