Spark SQL 的 Scala API - TimestampType - No Encoder found for org.apache.spark.sql.types.TimestampTyp
Posted
技术标签:
【中文标题】Spark SQL 的 Scala API - TimestampType - No Encoder found for org.apache.spark.sql.types.TimestampType【英文标题】:Spark SQL's Scala API - TimestampType - No Encoder found for org.apache.spark.sql.types.TimestampType 【发布时间】:2017-06-01 20:57:49 【问题描述】:我在 Databricks 笔记本上使用 Spark 2.1 和 Scala 2.11
TimestampType 到底是什么?
我们从SparkSQL's documentation得知,官方的时间戳类型是TimestampType,显然是java.sql.Timestamp的别名:
TimestampType 可以在 SparkSQL 的 Scala API 中找到
我们在使用架构和数据集 API 时有所不同
解析"time":1469501297,"action":"Open"
时from the Databricks' Scala Structured Streaming example
使用 Json 模式 --> 好的(我更喜欢使用优雅的 Dataset API):
val jsonSchema = new StructType().add("time", TimestampType).add("action", StringType)
val staticInputDF =
spark
.read
.schema(jsonSchema)
.json(inputPath)
使用 Dataset API --> KO:未找到 TimestampType 的编码器
创建事件类
import org.apache.spark.sql.types._
case class Event(action: String, time: TimestampType)
--> defined class Event
从数据块上的 DBFS 读取事件时出错。
注意:当使用java.sql.Timestamp
作为“时间”的类型时,我们不会收到错误
val path = "/databricks-datasets/structured-streaming/events/"
val events = spark.read.json(path).as[Event]
错误信息
java.lang.UnsupportedOperationException: No Encoder found for org.apache.spark.sql.types.TimestampType
- field (class: "org.apache.spark.sql.types.TimestampType", name: "time")
- root class:
【问题讨论】:
【参考方案1】:结合模式读取方法.schema(jsonSchema)
和包含java.sql.Timestamp
类型的as[Type]
方法将解决这个问题。这个想法是在阅读 Structured Streaming 文档 Creating streaming DataFrames and streaming Datasets
这些示例生成无类型的流数据帧,这意味着 在编译时不检查 DataFrame 的架构,仅 提交查询时在运行时检查。一些操作如 map、flatMap 等需要在编译时知道类型。去做 那些,你可以将这些无类型的流数据帧转换为有类型的 使用与静态 DataFrame 相同的方法流式传输数据集。
val path = "/databricks-datasets/structured-streaming/events/"
val jsonSchema = new StructType().add("time", TimestampType).add("action", StringType)
case class Event(action: String, time: java.sql.Timestamp)
val staticInputDS =
spark
.read
.schema(jsonSchema)
.json(path)
.as[Event]
staticInputDF.printSchema
将输出:
root
|-- time: timestamp (nullable = true)
|-- action: string (nullable = true)
【讨论】:
【参考方案2】:TimestampType
不是java.sql.Timestamp
的别名,而是 Spark 内部使用的时间戳类型的表示。一般来说,您不想在代码中使用TimestampType
。这个想法是 Spark SQL 原生支持 java.sql.Timestamp
,因此您可以按如下方式定义事件类:
case class Event(action: String, time: java.sql.Timestamp)
然后,在编译和优化查询时,Spark 将在内部使用 TimestampType
在运行时对值的类型进行建模,但这不是您大部分时间感兴趣的事情。
【讨论】:
使用java.sql.Timestamp
如果我们使用dataset.printSchema,我们有time: long
,对于架构我们有time:timestamp
。所以我们仍然必须在读取它之后将我们的时间字段转换为时间戳:/以上是关于Spark SQL 的 Scala API - TimestampType - No Encoder found for org.apache.spark.sql.types.TimestampTyp的主要内容,如果未能解决你的问题,请参考以下文章