Pyspark RDD 到具有强制模式的 DataFrame:值错误
Posted
技术标签:
【中文标题】Pyspark RDD 到具有强制模式的 DataFrame:值错误【英文标题】:Pyspark RDD to DataFrame with Enforced Schema: Value Error 【发布时间】:2017-05-07 17:33:29 【问题描述】:我正在使用 pyspark,其架构与本文末尾显示的架构相称(注意嵌套列表、无序字段),最初是从 Parquet 作为 DataFrame 导入的。从根本上说,我遇到的问题是无法将这些数据作为 RDD 处理,然后再转换回 DataFrame。 (我已经查看了几篇相关的帖子,但我仍然无法确定我哪里出错了。)
简单地说,以下代码可以正常工作(正如人们所期望的那样):
schema = deepcopy(tripDF.schema)
tripRDD = tripDF.rdd
tripDFNew = sqlContext.createDataFrame(tripRDD, schema)
tripDFNew.take(1)
当我需要映射 RDD 时(例如添加字段的情况),事情就不起作用了。
schema = deepcopy(tripDF.schema)
tripRDD = tripDF.rdd
def trivial_map(row):
rowDict = row.asDict()
return pyspark.Row(**rowDict)
tripRDDNew = tripRDD.map(lambda row: trivial_map(row))
tripDFNew = sqlContext.createDataFrame(tripRDDNew, schema)
tripDFNew.take(1)
上面的代码给出了以下异常,其中 XXX 是整数的替代,它会随着运行而变化(例如,我见过 1、16、23 等):
File "/opt/cloudera/parcels/CDH-5.8.3-
1.cdh5.8.3.p1967.2057/lib/spark/python/pyspark/sql/types.py", line 546, in
toInternal
raise ValueError("Unexpected tuple %r with StructType" % obj)
ValueError: Unexpected tuple XXX with StructType`
鉴于此信息,第二个代码块中是否存在明显错误? (我注意到tripRDD 属于rdd.RDD 类,而tripRDDNew 属于rdd.PipelinedRDD 类,但我认为这应该不是问题。)(我还注意到tripRDD 的架构不是按字段名排序的,而tripRDDNew 的架构按字段名称排序。同样,我不明白为什么会出现问题。)
架构:
root
|-- foo: struct (nullable = true)
| |-- bar_1: integer (nullable = true)
| |-- bar_2: integer (nullable = true)
| |-- bar_3: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- baz_1: integer (nullable = true)
| | | |-- baz_2: string (nullable = true)
| | | |-- baz_3: double (nullable = true)
| |-- bar_4: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- baz_1: integer (nullable = true)
| | | |-- baz_2: string (nullable = true)
| | | |-- baz_3: double (nullable = true)
|-- qux: integer (nullable = true)
|-- corge: integer (nullable = true)
|-- uier: integer (nullable = true)`
【问题讨论】:
【参考方案1】:如帖子中所述,原始架构中的字段未按字母顺序排列。问题就在于此。在映射函数中使用 .asDict() 对生成的 RDD 的字段进行排序。在调用 createDataFrame 时,tripRDDNew 的字段顺序与 schema 冲突。 ValueError 是由于尝试将整数字段之一(即示例中的 qux、corge 或 uier)解析为 StructType 而导致的。
(顺便说一句:createDataFrame 要求模式字段与 RDD 字段具有相同的顺序有点令人惊讶。您应该需要一致的字段名称或一致的字段顺序,但同时要求两者似乎有点过头了。)
(顺便说一句:DataFrame中存在非字母字段有些不正常。例如,sc.parallelize()在分发数据结构时会自动按字母顺序排列字段。看起来应该对字段进行排序从 parquet 文件导入 DataFrame 时。调查为什么不是这种情况可能会很有趣。)
【讨论】:
以上是关于Pyspark RDD 到具有强制模式的 DataFrame:值错误的主要内容,如果未能解决你的问题,请参考以下文章