Spark SQL 的 Scala API - TimestampType - No Encoder found for org.apache.spark.sql.types.TimestampTyp

Posted

技术标签:

【中文标题】Spark SQL 的 Scala API - TimestampType - No Encoder found for org.apache.spark.sql.types.TimestampType【英文标题】:Spark SQL's Scala API - TimestampType - No Encoder found for org.apache.spark.sql.types.TimestampType 【发布时间】:2017-06-01 20:57:49 【问题描述】:

我在 Databricks 笔记本上使用 Spark 2.1 和 Scala 2.11

TimestampType 到底是什么?

​我们从​​​SparkSQL's documentation​得知,官方的时间戳类型是TimestampType,显然是java.sql.Timestamp的别名:

TimestampType 可以在 SparkSQL 的 Scala API 中找到

我们在使用架构和数据集 API 时有所不同

解析"time":1469501297,"action":"Open"时from the Databricks' Scala Structured Streaming example

使用 Json 模式 --> 好的(我更喜欢使用优雅的 Dataset API):

val jsonSchema = new StructType().add("time", TimestampType).add("action", StringType)

val staticInputDF = 
  spark
    .read
    .schema(jsonSchema)
    .json(inputPath)

使用 Dataset API --> KO:未找到 TimestampType 的编码器

创建事件类

import org.apache.spark.sql.types._
case class Event(action: String, time: TimestampType)
--> defined class Event

从数据块上的 DBFS 读取事件时出错。

注意:当使用java.sql.Timestamp 作为“时间”的类型时,我们不会收到错误

val path = "/databricks-datasets/structured-streaming/events/"
val events = spark.read.json(path).as[Event]

错误信息

java.lang.UnsupportedOperationException: No Encoder found for org.apache.spark.sql.types.TimestampType
- field (class: "org.apache.spark.sql.types.TimestampType", name: "time")
- root class: 

【问题讨论】:

【参考方案1】:

结合模式读取方法.schema(jsonSchema) 和包含java.sql.Timestamp 类型的as[Type] 方法将解决这个问题。这个想法是在阅读 Structured Streaming 文档 Creating streaming DataFrames and streaming Datasets

后产生的

这些示例生成无类型的流数据帧,这意味着 在编译时不检查 DataFrame 的架构,仅 提交查询时在运行时检查。一些操作如 map、flatMap 等需要在编译时知道类型。去做 那些,你可以将这些无类型的流数据帧转换为有类型的 使用与静态 DataFrame 相同的方法流式传输数据集。

val path = "/databricks-datasets/structured-streaming/events/"

val jsonSchema = new StructType().add("time", TimestampType).add("action", StringType)

case class Event(action: String, time: java.sql.Timestamp)

val staticInputDS = 
  spark
    .read
    .schema(jsonSchema)
    .json(path)
    .as[Event]

staticInputDF.printSchema

将输出:

root
 |-- time: timestamp (nullable = true)
 |-- action: string (nullable = true)

【讨论】:

【参考方案2】:

TimestampType 不是java.sql.Timestamp 的别名,而是 Spark 内部使用的时间戳类型的表示。一般来说,您不想在代码中使用TimestampType。这个想法是 Spark SQL 原生支持 java.sql.Timestamp,因此您可以按如下方式定义事件类:

case class Event(action: String, time: java.sql.Timestamp)

然后,在编译和优化查询时,Spark 将在内部使用 TimestampType 在运行时对值的类型进行建模,但这不是您大部分时间感兴趣的事情。

【讨论】:

使用java.sql.Timestamp 如果我们使用dataset.printSchema,我们有time: long,对于架构我们有time:timestamp 。所以我们仍然必须在读取它之后将我们的时间字段转换为时间戳:/

以上是关于Spark SQL 的 Scala API - TimestampType - No Encoder found for org.apache.spark.sql.types.TimestampTyp的主要内容,如果未能解决你的问题,请参考以下文章

十spark SQL的scala示例

在scala中使用spark sql解决特定需求

Scala - 如何在 Spark SQL 查询中将日期字符串转换为时间戳?

Spark SQL内置函数

spark scala mysql 语法

udf spark Scala 返回案例类