flink笔记15 flink table表的时间属性
Posted Aurora1217
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了flink笔记15 flink table表的时间属性相关的知识,希望对你有一定的参考价值。
表的时间属性
1.时间属性介绍
像窗口(在 Table API 和 SQL )这种基于时间的操作,需要有时间信息。因此,Table API 中的表就需要提供逻辑时间属性来表示时间,以及支持时间相关的操作。
每种类型的表都可以有时间属性,可以在用CREATE TABLE DDL创建表的时候指定、也可以在DataStream中指定、也可以在定义TableSource时指定。一旦时间属性定义好,它就可以像普通列一样使用,也可以在时间相关的操作中使用。
时间属性可以像普通的时间戳的列一样被使用和计算。一旦时间属性被用在了计算中,它就会被物化,进而变成一个普通的时间戳。普通的时间戳是无法跟 Flink 的时间以及watermark等一起使用的,所以普通的时间戳就无法用在时间相关的操作中。
Table API 程序需要在 streaming environment 中指定时间属性
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) // 默认ProcessingTime
// 或者:
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
2.处理时间(ProcessingTime)
处理时间是基于机器的本地时间来处理数据,它是最简单的一种时间概念,但是它不能提供确定性。它既不需要从数据里获取时间,也不需要生成 watermark。
三种方法定义处理时间
在创建表的DDL中定义
在创建表的 DDL 中用计算列的方式定义,用PROCTIME()就可以定义处理时间
//以接收文件数据建表为例
val tableEnv = StreamTableEnvironment.create(env)
tableEnv
.connect(new FileSystem().path("src/main/resources/sensor.txt"))
.withFormat(new Csv())
.withSchema(
new Schema()
.field("id",DataTypes.STRING())
.field("timestamp",DataTypes.BIGINT())
.field("temperature",DataTypes.DOUBLE())
.field("pt",DataTypes.TIMESTAMP(3)).proctime()
)
.createTemporaryTable("inputtable")
val dataTable = tableEnv.from("inputtable")
在 DataStream 到 Table 转换时定义
处理时间属性可以在 schema 定义的时候用.proctime后缀来定义。时间属性一定不能定义在一个已有字段上,所以它只能定义在 schema 定义的最后。
val tableEnv = StreamTableEnvironment.create(env)
val dataTable = tableEnv.fromDataStream(dataStream,'id,'timestamp,'temperature,'pt.proctime)
使用 TableSource 定义
处理时间属性可以在实现了 DefinedProctimeAttribute 的 TableSource 中定义。逻辑的时间属性会放在 TableSource 已有物理字段的最后
全部代码参考:
import org.apache.flink.table.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment, createTypeInformation
import org.apache.flink.table.descriptors.Csv, FileSystem, Schema
import org.apache.flink.table.api.DataTypes
import org.apache.flink.table.api.scala.StreamTableEnvironment, UnresolvedFieldExpression
import org.apache.flink.types.Row
case class sensorReading(id:String,timestamp:Long,temperature:Double)
object processing
def main(args: Array[String]): Unit =
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
// 1.流转换成表时 + processingtime
// val inputStream = env.readTextFile("src/main/resources/sensor.txt")
// val dataStream = inputStream
// .map(data =>
// val arr = data.split(",")
// sensorReading(arr(0),arr(1).toLong,arr(2).toDouble)
// )
// .assignAscendingTimestamps(_.timestamp * 1000)
// val tableEnv = StreamTableEnvironment.create(env)
//
// val dataTable = tableEnv.fromDataStream(dataStream,'id,'timestamp,'temperature,'pt.proctime)
// 2.建表时 + processingtime
val tableEnv = StreamTableEnvironment.create(env)
tableEnv
.connect(new FileSystem().path("src/main/resources/sensor.txt"))
.withFormat(new Csv())
.withSchema(
new Schema()
.field("id",DataTypes.STRING())
.field("timestamp",DataTypes.BIGINT())
.field("temperature",DataTypes.DOUBLE())
.field("pt",DataTypes.TIMESTAMP(3)).proctime()
)
.createTemporaryTable("inputtable")
val dataTable = tableEnv.from("inputtable")
// 打印
dataTable.printSchema()
结果:
3.事件时间(ProcessingTime)
事件时间允许程序按照数据中包含的时间来处理,这样可以在有乱序或者晚到的数据的情况下产生一致的处理结果。它可以保证从外部存储读取数据后产生可以复现(replayable)的结果。
为了能够处理乱序的事件,并且区分正常到达和晚到的事件,Flink 需要从事件中获取事件时间并且产生 watermark
事件时间属性也有类似于处理时间的三种定义方式:在DDL中定义、在 DataStream 到 Table 转换时定义、用 TableSource 定义。
在DataStream转换成Table时定义
val tableEnv = StreamTableEnvironment.create(env)
val dataTable = tableEnv.fromDataStream(dataStream,'id,'timestamp.rowtime,'temperature)
// val dataTable =tableEnv.fromDataStream(dataStream,'id,'timestamp,'temperature,'rt.rowtime)
在创建表的DDL中定义
val tableEnv = StreamTableEnvironment.create(env)
tableEnv
.connect(new FileSystem().path("src/main/resources/sensor.txt"))
.withFormat(new Csv())
.withSchema(
new Schema()
.field("id",DataTypes.STRING())
.field("timestamp",DataTypes.BIGINT())
.field("temperature",DataTypes.DOUBLE())
.rowtime(
new Rowtime()
.timestampsFromField("timestamp")
.watermarksPeriodicBounded(1000)
)
)
.createTemporaryTable("dataTable")
val dataTable = tableEnv.from("dataTable")
使用 TableSource 定义
全部代码参考:
import org.apache.flink.table.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment, createTypeInformation
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.table.api.DataTypes
import org.apache.flink.table.api.scala.StreamTableEnvironment, UnresolvedFieldExpression
import org.apache.flink.table.descriptors.Csv, FileSystem, Rowtime, Schema
import org.apache.flink.types.Row
case class sensorReading(id:String,timestamp:Long,temperature:Double)
object eventtime
def main(args: Array[String]): Unit =
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// 1.在流转换成表的时候 + rowtime
val inputStream = env.readTextFile("src/main/resources/sensor.txt")
val dataStream = inputStream
.map(data =>
val arr = data.split(",")
sensorReading(arr(0),arr(1).toLong,arr(2).toDouble)
)
.assignAscendingTimestamps(_.timestamp * 1000)
val tableEnv = StreamTableEnvironment.create(env)
val dataTable = tableEnv.fromDataStream(dataStream,'id,'timestamp.rowtime,'temperature)
// val dataTable =tableEnv.fromDataStream(dataStream,'id,'timestamp,'temperature,'rt.rowtime)
// 2.在建表时 + rowtime
// val tableEnv = StreamTableEnvironment.create(env)
// tableEnv
// .connect(new FileSystem().path("src/main/resources/sensor.txt"))
// .withFormat(new Csv())
// .withSchema(
// new Schema()
// .field("id",DataTypes.STRING())
// .field("timestamp",DataTypes.BIGINT())
// .field("temperature",DataTypes.DOUBLE())
// .rowtime(
// new Rowtime()
// .timestampsFromField("timestamp")
// .watermarksPeriodicBounded(1000)
// )
// )
// .createTemporaryTable("dataTable")
//
// val dataTable = tableEnv.from("dataTable")
dataTable.printSchema()
结果:
以上是关于flink笔记15 flink table表的时间属性的主要内容,如果未能解决你的问题,请参考以下文章
18-flink-1.10.1-Table API & Flink SQL
flink笔记16 flink table windows(Group Windows/Over Windows)
flink笔记16 flink table windows(Group Windows/Over Windows)