flink笔记15 flink table表的时间属性

Posted Aurora1217

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了flink笔记15 flink table表的时间属性相关的知识,希望对你有一定的参考价值。

表的时间属性

 

1.时间属性介绍

2.处理时间(ProcessingTime)

在创建表的DDL中定义

在 DataStream 到 Table 转换时定义

使用 TableSource 定义

3.事件时间(ProcessingTime)

在DataStream转换成Table时定义

在创建表的DDL中定义

使用 TableSource 定义


1.时间属性介绍

像窗口(在 Table API 和 SQL )这种基于时间的操作,需要有时间信息。因此,Table API 中的表就需要提供逻辑时间属性来表示时间,以及支持时间相关的操作。

每种类型的表都可以有时间属性,可以在用CREATE TABLE DDL创建表的时候指定、也可以在DataStream中指定、也可以在定义TableSource时指定。一旦时间属性定义好,它就可以像普通列一样使用,也可以在时间相关的操作中使用。

时间属性可以像普通的时间戳的列一样被使用和计算。一旦时间属性被用在了计算中,它就会被物化,进而变成一个普通的时间戳。普通的时间戳是无法跟 Flink 的时间以及watermark等一起使用的,所以普通的时间戳就无法用在时间相关的操作中。

Table API 程序需要在 streaming environment 中指定时间属性

val env = StreamExecutionEnvironment.getExecutionEnvironment

env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) // 默认ProcessingTime

// 或者:
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)

2.处理时间(ProcessingTime)

处理时间是基于机器的本地时间来处理数据,它是最简单的一种时间概念,但是它不能提供确定性。它既不需要从数据里获取时间,也不需要生成 watermark。

三种方法定义处理时间

在创建表的DDL中定义

在创建表的 DDL 中用计算列的方式定义,用PROCTIME()就可以定义处理时间

//以接收文件数据建表为例
val tableEnv = StreamTableEnvironment.create(env)
tableEnv
  .connect(new FileSystem().path("src/main/resources/sensor.txt"))
  .withFormat(new Csv())
  .withSchema(
    new Schema()
      .field("id",DataTypes.STRING())
      .field("timestamp",DataTypes.BIGINT())
      .field("temperature",DataTypes.DOUBLE())
      .field("pt",DataTypes.TIMESTAMP(3)).proctime()
  )
  .createTemporaryTable("inputtable")

val dataTable = tableEnv.from("inputtable")

在 DataStream 到 Table 转换时定义

处理时间属性可以在 schema 定义的时候用.proctime后缀来定义。时间属性一定不能定义在一个已有字段上,所以它只能定义在 schema 定义的最后。

val tableEnv = StreamTableEnvironment.create(env)
val dataTable = tableEnv.fromDataStream(dataStream,'id,'timestamp,'temperature,'pt.proctime)

使用 TableSource 定义

处理时间属性可以在实现了 DefinedProctimeAttribute 的 TableSource 中定义。逻辑的时间属性会放在 TableSource 已有物理字段的最后

全部代码参考:

import org.apache.flink.table.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}
import org.apache.flink.table.descriptors.{Csv, FileSystem, Schema}
import org.apache.flink.table.api.DataTypes
import org.apache.flink.table.api.scala.{StreamTableEnvironment, UnresolvedFieldExpression}
import org.apache.flink.types.Row

case class sensorReading(id:String,timestamp:Long,temperature:Double)

object processing {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)

//    1.流转换成表时 + processingtime
//    val inputStream = env.readTextFile("src/main/resources/sensor.txt")
//    val dataStream = inputStream
//      .map(data => {
//        val arr = data.split(",")
//        sensorReading(arr(0),arr(1).toLong,arr(2).toDouble)
//      })
//      .assignAscendingTimestamps(_.timestamp * 1000)
//    val tableEnv = StreamTableEnvironment.create(env)
//
//    val dataTable = tableEnv.fromDataStream(dataStream,'id,'timestamp,'temperature,'pt.proctime)

//    2.建表时 + processingtime
    val tableEnv = StreamTableEnvironment.create(env)
    tableEnv
      .connect(new FileSystem().path("src/main/resources/sensor.txt"))
      .withFormat(new Csv())
      .withSchema(
        new Schema()
          .field("id",DataTypes.STRING())
          .field("timestamp",DataTypes.BIGINT())
          .field("temperature",DataTypes.DOUBLE())
          .field("pt",DataTypes.TIMESTAMP(3)).proctime()
      )
      .createTemporaryTable("inputtable")

    val dataTable = tableEnv.from("inputtable")

//    打印
    dataTable.printSchema()
  }
}

结果:

3.事件时间(ProcessingTime)

事件时间允许程序按照数据中包含的时间来处理,这样可以在有乱序或者晚到的数据的情况下产生一致的处理结果。它可以保证从外部存储读取数据后产生可以复现(replayable)的结果。

为了能够处理乱序的事件,并且区分正常到达和晚到的事件,Flink 需要从事件中获取事件时间并且产生 watermark

事件时间属性也有类似于处理时间的三种定义方式:在DDL中定义、在 DataStream 到 Table 转换时定义、用 TableSource 定义。

在DataStream转换成Table时定义

val tableEnv = StreamTableEnvironment.create(env)

val dataTable = tableEnv.fromDataStream(dataStream,'id,'timestamp.rowtime,'temperature)
//    val dataTable =tableEnv.fromDataStream(dataStream,'id,'timestamp,'temperature,'rt.rowtime)

在创建表的DDL中定义

val tableEnv = StreamTableEnvironment.create(env)

tableEnv
  .connect(new FileSystem().path("src/main/resources/sensor.txt"))
  .withFormat(new Csv())
  .withSchema(
    new Schema()
      .field("id",DataTypes.STRING())
      .field("timestamp",DataTypes.BIGINT())
      .field("temperature",DataTypes.DOUBLE())
      .rowtime(
        new Rowtime()
          .timestampsFromField("timestamp")
          .watermarksPeriodicBounded(1000)
      )
  )
  .createTemporaryTable("dataTable")

val dataTable = tableEnv.from("dataTable")

使用 TableSource 定义

全部代码参考:

import org.apache.flink.table.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.table.api.DataTypes
import org.apache.flink.table.api.scala.{StreamTableEnvironment, UnresolvedFieldExpression}
import org.apache.flink.table.descriptors.{Csv, FileSystem, Rowtime, Schema}
import org.apache.flink.types.Row

case class sensorReading(id:String,timestamp:Long,temperature:Double)

object eventtime {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

//    1.在流转换成表的时候 + rowtime
    val inputStream = env.readTextFile("src/main/resources/sensor.txt")

    val dataStream = inputStream
      .map(data =>{
        val arr = data.split(",")
        sensorReading(arr(0),arr(1).toLong,arr(2).toDouble)
      })
      .assignAscendingTimestamps(_.timestamp * 1000)

    val tableEnv = StreamTableEnvironment.create(env)

    val dataTable = tableEnv.fromDataStream(dataStream,'id,'timestamp.rowtime,'temperature)
//    val dataTable =tableEnv.fromDataStream(dataStream,'id,'timestamp,'temperature,'rt.rowtime)

//    2.在建表时 + rowtime
//    val tableEnv = StreamTableEnvironment.create(env)

//    tableEnv
//      .connect(new FileSystem().path("src/main/resources/sensor.txt"))
//      .withFormat(new Csv())
//      .withSchema(
//        new Schema()
//          .field("id",DataTypes.STRING())
//          .field("timestamp",DataTypes.BIGINT())
//          .field("temperature",DataTypes.DOUBLE())
//          .rowtime(
//            new Rowtime()
//              .timestampsFromField("timestamp")
//              .watermarksPeriodicBounded(1000)
//          )
//      )
//      .createTemporaryTable("dataTable")
//
//    val dataTable = tableEnv.from("dataTable")
    
    dataTable.printSchema()

  }
}

结果:

 

以上是关于flink笔记15 flink table表的时间属性的主要内容,如果未能解决你的问题,请参考以下文章

flink笔记15 flink table表的时间属性

18-flink-1.10.1-Table API & Flink SQL

flink笔记16 flink table windows(Group Windows/Over Windows)

flink笔记16 flink table windows(Group Windows/Over Windows)

flink笔记16 flink table windows(Group Windows/Over Windows)

18-flink-1.10.1-Table API & Flink SQL