从包含 Option[T] 的行创建 DataFrame 的问题
Posted
技术标签:
【中文标题】从包含 Option[T] 的行创建 DataFrame 的问题【英文标题】:Problems to create DataFrame from Rows containing Option[T] 【发布时间】:2017-06-02 08:25:38 【问题描述】:我正在将一些代码从 Spark 1.6 迁移到 Spark 2.1,并遇到以下问题:
这在 Spark 1.6 中完美运行
import org.apache.spark.sql.types.LongType, StructField, StructType
val schema = StructType(Seq(StructField("i", LongType,nullable=true)))
val rows = sparkContext.parallelize(Seq(Row(Some(1L))))
sqlContext.createDataFrame(rows,schema).show
Spark 2.1.1 中的相同代码:
import org.apache.spark.sql.types.FloatType, LongType, StructField, StructType
val schema = StructType(Seq(StructField("i", LongType,nullable=true)))
val rows = ss.sparkContext.parallelize(Seq(Row(Some(1L))))
ss.createDataFrame(rows,schema).show
给出以下运行时异常:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 8.0 failed 4 times, most recent failure: Lost task 0.3 in stage 8.0 (TID 72, i89203.sbb.ch, executor 9): java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: scala.Some is not a valid external type for schema of bigint
那么如果我想拥有可空的Long
而不是使用Option[Long]
,我应该如何将这样的代码转换为Spark 2.x?
【问题讨论】:
【参考方案1】:实际上有一个关于这个问题的 JIRA SPARK-19056 实际上并不是一个问题。
所以这种行为是故意的。
在
Row
中允许Option
从未被记录在案,当我们将编码器框架应用于所有类型化操作时会带来很多麻烦。从 Spark 2.0 开始,请使用Dataset
进行类型化操作/自定义对象。例如
val ds = Seq(1 -> None, 2 -> Some("str")).toDS
ds.toDF // schema: <_1: int, _2: string>
【讨论】:
感谢您提供此信息。我知道输入数据集是最干净的解决方案,但也需要一些时间来重构代码 @RaphaelRoth 唯一的问题是没有消息了。这是标准。 我不确定强类型数据集是否真的会成为新标准......我有一些疑问【参考方案2】:错误消息很清楚,当需要bigint
时使用Some
scala.Some is not a valid external type for schema of bigint
所以你需要使用Option
与getOrElse
结合使用,这样我们就可以在Option
返回nullpointer
时定义null
。以下代码应该适合您
val sc = ss.sparkContext
val sqlContext = ss.sqlContext
val schema = StructType(Seq(StructField("i", LongType,nullable=true)))
val rows = sc.parallelize(Seq(Row(Option(1L) getOrElse(null))))
sqlContext.createDataFrame(rows,schema).show
希望这个回答对你有帮助
【讨论】:
谢谢,就我而言,这确实是最简单的“解决方法” 很高兴听到这个消息:)以上是关于从包含 Option[T] 的行创建 DataFrame 的问题的主要内容,如果未能解决你的问题,请参考以下文章
使用 Pyspark 从单词列表的行条目创建元组并使用 RDD 计数