flink笔记10 [实验]体验ProcessingTime和指定EventTime下的区别
Posted Aurora1217
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了flink笔记10 [实验]体验ProcessingTime和指定EventTime下的区别相关的知识,希望对你有一定的参考价值。
体验ProcessingTime和指定EventTime下的区别
实验数据
sensor_1,1619492107,36.2
sensor_1,1619492108,36.0
sensor_1,1619492109,36.5
sensor_1,1619492110,34.3
sensor_1,1619492111,34.3
sensor_1,1619492112,34.3
sensor_1,1619492113,34.3
sensor_1,1619492114,34.3
sensor_1,1619492115,34.3
sensor_1,1619492116,34.3
sensor_1,1619492117,34.3
sensor_1,1619492118,34.3
sensor_1,1619492119,34.3
sensor_1,1619492120,36.2
sensor_1,1619492121,36.1
sensor_1,1619492122,36.5
sensor_1,1619492123,35.2
sensor_1,1619492124,36.2
sensor_1,1619492125,36.1
sensor_1,1619492126,36.5
sensor_1,1619492127,36.2
sensor_1,1619492128,36.1
sensor_1,1619492129,36.3
sensor_1,1619492130,36.2
sensor_1,1619492131,36.2
sensor_1,1619492132,36.2
sensor_1,1619492170,36.2
sensor_1,1619492171,36.5
sensor_1,1619492172,37.3
sensor_1,1619492173,36.2
sensor_1,1619492174,36.3
sensor_1,1619492175,36.1
sensor_1,1619492176,36.6
sensor_1,1619492177,36.5
sensor_1,1619492178,36.1
sensor_1,1619492179,36.8
sensor_1,1619492180,36.1
sensor_1,1619492209,36.5
实验代码
(一) EventTime
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}
import org.apache.flink.streaming.api.windowing.time.Time
import org.tysy.window_api.sensorReading
import java.util.Date
object window_eventtime {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val inputStream = env.socketTextStream("localhost",7777)
val resultStream = inputStream
.map(data =>{
val arr = data.split(",")
sensorReading(arr(0),arr(1).toLong,arr(2).toDouble)
})
.assignAscendingTimestamps(_.timestamp * 1000)
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[sensorReading](Time.seconds(3)) {
override def extractTimestamp(element: sensorReading): Long = element.timestamp * 1000L
} )
.keyBy(_.id)
.timeWindow(Time.seconds(15))
.reduce((data1,data2)=>sensorReading(data1.id,data2.timestamp,data1.temperature.min(data2.temperature)))
.map(data =>(data,new Date().getTime))
resultStream.print()
env.execute("window_eventtime test")
}
}
(二) ProcessingTime
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}
import org.apache.flink.streaming.api.windowing.time.Time
import org.tysy.window_api.sensorReading
import java.util.Date
object window_processingtime {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val inputStream = env.socketTextStream("localhost",7777)
val resultStream = inputStream
.map(data =>{
val arr = data.split(",")
sensorReading(arr(0),arr(1).toLong,arr(2).toDouble)
})
.keyBy(_.id)
.timeWindow(Time.seconds(15))
.reduce((data1,data2)=>sensorReading(data1.id,data2.timestamp,data1.temperature.min(data2.temperature)))
.map(data =>(data,new Date().getTime))
resultStream.print()
env.execute("window_processingtime test")
}
}
实验结果
实验分析
①数据的最后一列是系统的时间戳。
②当使用的是ProcessingTime时,在同一个window里有多少条数据,和测试代码时在该window的时间内输入的代码条数有关,最后一列的时间戳可看出窗口大小15秒。
③当使用的是EventTime时,和输入数据的速度没有关系,只和实验中指定的时间戳大小有关。
以上是关于flink笔记10 [实验]体验ProcessingTime和指定EventTime下的区别的主要内容,如果未能解决你的问题,请参考以下文章
flink笔记10 [实验]体验ProcessingTime和指定EventTime下的区别
flink笔记9 [实验]体验窗口开启时间和关闭时间(Eventtime)
flink笔记9 [实验]体验窗口开启时间和关闭时间(Eventtime)