Spark结构化流 - 使用模式从文件中读取时间戳

Posted

技术标签:

【中文标题】Spark结构化流 - 使用模式从文件中读取时间戳【英文标题】:Spark Structured streaming - reading timestamp from file using schema 【发布时间】:2021-04-19 12:11:42 【问题描述】:

我正在从事结构化流式传输工作。

我从文件中读取的数据包含时间戳(以毫秒为单位)、deviceId 和该设备报告的值。 多个设备报告数据。

我正在尝试编写一个作业,将所有设备发送的值聚合(汇总)到 1 分钟的滚动窗口中。

我遇到的问题是时间戳。

当我尝试将“时间戳”解析为 Long 时,窗口函数抱怨它需要“时间戳类型”。 当我尝试像下面的 sn-p 那样解析 TimestampType 时,我得到了.MatchError 异常(完整的异常可以在下面看到),我正在努力弄清楚为什么以及正确的处理方法是什么

// Create schema
StructType readSchema = new StructType().add("value" , "integer")
                                        .add("deviceId", "long")
                                        .add("timestamp", new TimestampType());

// Read data from file
Dataset<Row> inputDataFrame = sparkSession.readStream()
                                          .schema(readSchema)
                                          .parquet(path);

Dataset<Row> aggregations = inputDataFrame.groupBy(window(inputDataFrame.col("timestamp"), "1 minutes"),
                                                  inputDataFrame.col("deviceId"))
                                          .agg(sum("value"));

例外:

 org.apache.spark.sql.types.TimestampType@3eeac696 (of class org.apache.spark.sql.types.TimestampType)
scala.MatchError: org.apache.spark.sql.types.TimestampType@3eeac696 (of class org.apache.spark.sql.types.TimestampType)
    at org.apache.spark.sql.catalyst.encoders.RowEncoder$.externalDataTypeFor(RowEncoder.scala:215)
    at org.apache.spark.sql.catalyst.encoders.RowEncoder$.externalDataTypeForInput(RowEncoder.scala:212)
    at org.apache.spark.sql.catalyst.expressions.objects.ValidateExternalType.<init>(objects.scala:1692)
    at org.apache.spark.sql.catalyst.encoders.RowEncoder$.$anonfun$serializerFor$3(RowEncoder.scala:175)
    at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:245)
    at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
    at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
    at scala.collection.TraversableLike.flatMap(TraversableLike.scala:245)
    at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:242)
    at scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:198)
    at org.apache.spark.sql.catalyst.encoders.RowEncoder$.serializerFor(RowEncoder.scala:171)
    at org.apache.spark.sql.catalyst.encoders.RowEncoder$.apply(RowEncoder.scala:66)
    at org.apache.spark.sql.Dataset$.$anonfun$ofRows$1(Dataset.scala:92)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
    at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:89)
    at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:232)
    at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:242)
    at org.apache.spark.sql.streaming.DataStreamReader.parquet(DataStreamReader.scala:450)

【问题讨论】:

【参考方案1】:

通常,当您的时间戳作为 long 存储在 milis 中时,您会将其转换为 timestamp 类型,如下所示:

// Create schema and keep column 'timestamp' as long
StructType readSchema = new StructType()
    .add("value", "integer")
    .add("deviceId", "long")
    .add("timestamp", "long");

// Read data from file
Dataset<Row> inputDataFrame = sparkSession.readStream()
                                          .schema(readSchema)
                                          .parquet(path);

// convert timestamp column into a proper timestamp type
Dataset<Row> df1 = inputDataFrame.withColumn("new_timestamp", expr("timestamp/1000").cast(DataTypes.TimestampType));

df1.show(false)

+-----+--------+-------------+-----------------------+
|value|deviceId|timestamp    |new_timestamp          |
+-----+--------+-------------+-----------------------+
|1    |1337    |1618836775397|2021-04-19 14:52:55.397|
+-----+--------+-------------+-----------------------+

df1.printSchema();

root
 |-- value: integer (nullable = true)
 |-- deviceId: long (nullable = true)
 |-- timestamp: long (nullable = true)
 |-- new_timestamp: timestamp (nullable = true)

【讨论】:

非常感谢!我试图了解解决方案背后的“原因”。根据 TimestampType 的 Spark API 文档:“时间戳类型表示以微秒精度为单位的时间瞬间”。但是我们除以 1000,这意味着转换为秒。你能帮我理解吗?再次感谢 @Victoriia 很高兴它有帮助。不幸的是,我无法在微发送精度上对任何在线资源进行罚款。如果您碰巧拥有 O'Reilly 出版的“Spark - The Definitive Guide”一书,您可以阅读第 98 页“另一个常见的问题是 Spark 的 TimestampType 类仅支持二级精度”。 感谢您的准确参考,我可以在线访问这本书,我将阅读相关部分(希望在不久的将来阅读整本书)。希望它是 TimestampType 类文档的一部分/

以上是关于Spark结构化流 - 使用模式从文件中读取时间戳的主要内容,如果未能解决你的问题,请参考以下文章

使用 spark 结构化流从 s3 读取 avro 文件

如何从 Spark 结构化流中的 Cassandra 等外部存储读取 Kafka 和查询?

维护传入流数据的时间戳序列

在 Spark Structured Streaming 中从中间读取现有多级分区文件数据的问题

Spark 结构化流:Scala 中的模式推理

从多个 Kafka 主题读取的 Spark 结构化流式应用程序