flink笔记10 [实验]体验ProcessingTime和指定EventTime下的区别

Posted Aurora1217

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了flink笔记10 [实验]体验ProcessingTime和指定EventTime下的区别相关的知识,希望对你有一定的参考价值。

体验ProcessingTime和指定EventTime下的区别

 

实验数据

实验代码

实验结果

实验分析


实验数据

sensor_1,1619492107,36.2
sensor_1,1619492108,36.0
sensor_1,1619492109,36.5
sensor_1,1619492110,34.3
sensor_1,1619492111,34.3
sensor_1,1619492112,34.3
sensor_1,1619492113,34.3
sensor_1,1619492114,34.3
sensor_1,1619492115,34.3
sensor_1,1619492116,34.3
sensor_1,1619492117,34.3
sensor_1,1619492118,34.3
sensor_1,1619492119,34.3
sensor_1,1619492120,36.2
sensor_1,1619492121,36.1
sensor_1,1619492122,36.5
sensor_1,1619492123,35.2
sensor_1,1619492124,36.2
sensor_1,1619492125,36.1
sensor_1,1619492126,36.5
sensor_1,1619492127,36.2
sensor_1,1619492128,36.1
sensor_1,1619492129,36.3
sensor_1,1619492130,36.2
sensor_1,1619492131,36.2
sensor_1,1619492132,36.2
sensor_1,1619492170,36.2
sensor_1,1619492171,36.5
sensor_1,1619492172,37.3
sensor_1,1619492173,36.2
sensor_1,1619492174,36.3
sensor_1,1619492175,36.1
sensor_1,1619492176,36.6
sensor_1,1619492177,36.5
sensor_1,1619492178,36.1
sensor_1,1619492179,36.8
sensor_1,1619492180,36.1
sensor_1,1619492209,36.5

实验代码

(一) EventTime

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}
import org.apache.flink.streaming.api.windowing.time.Time
import org.tysy.window_api.sensorReading

import java.util.Date

object window_eventtime {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val inputStream = env.socketTextStream("localhost",7777)

    val resultStream = inputStream
      .map(data =>{
        val arr = data.split(",")
        sensorReading(arr(0),arr(1).toLong,arr(2).toDouble)
      })
      .assignAscendingTimestamps(_.timestamp * 1000)
      .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[sensorReading](Time.seconds(3)) {
        override def extractTimestamp(element: sensorReading): Long = element.timestamp * 1000L
      } )
      .keyBy(_.id)
      .timeWindow(Time.seconds(15))
      .reduce((data1,data2)=>sensorReading(data1.id,data2.timestamp,data1.temperature.min(data2.temperature)))
      .map(data =>(data,new Date().getTime))

    resultStream.print()

    env.execute("window_eventtime test")
  }
}

(二) ProcessingTime

import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}
import org.apache.flink.streaming.api.windowing.time.Time
import org.tysy.window_api.sensorReading

import java.util.Date

object window_processingtime {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val inputStream = env.socketTextStream("localhost",7777)

    val resultStream = inputStream
      .map(data =>{
        val arr = data.split(",")
        sensorReading(arr(0),arr(1).toLong,arr(2).toDouble)
      })
      .keyBy(_.id)
      .timeWindow(Time.seconds(15))
      .reduce((data1,data2)=>sensorReading(data1.id,data2.timestamp,data1.temperature.min(data2.temperature)))
      .map(data =>(data,new Date().getTime))

    resultStream.print()

    env.execute("window_processingtime test")
  }
}

实验结果

实验分析

①数据的最后一列是系统的时间戳。
②当使用的是ProcessingTime时,在同一个window里有多少条数据,和测试代码时在该window的时间内输入的代码条数有关,最后一列的时间戳可看出窗口大小15秒。
③当使用的是EventTime时,和输入数据的速度没有关系,只和实验中指定的时间戳大小有关。

 

 

以上是关于flink笔记10 [实验]体验ProcessingTime和指定EventTime下的区别的主要内容,如果未能解决你的问题,请参考以下文章

flink笔记10 [实验]体验ProcessingTime和指定EventTime下的区别

flink笔记9 [实验]体验窗口开启时间和关闭时间(Eventtime)

flink笔记9 [实验]体验窗口开启时间和关闭时间(Eventtime)

实践教程之使用 PolarDB-X 与 Flink 搭建实时数据大屏

Flink中的Time与Window

Flink1.14.3流批一体体验