Pyspark RDD 到具有强制模式的 DataFrame:值错误

Posted

技术标签:

【中文标题】Pyspark RDD 到具有强制模式的 DataFrame:值错误【英文标题】:Pyspark RDD to DataFrame with Enforced Schema: Value Error 【发布时间】:2017-05-07 17:33:29 【问题描述】:

我正在使用 pyspark,其架构与本文末尾显示的架构相称(注意嵌套列表、无序字段),最初是从 Parquet 作为 DataFrame 导入的。从根本上说,我遇到的问题是无法将这些数据作为 RDD 处理,然后再转换回 DataFrame。 (我已经查看了几篇相关的帖子,但我仍然无法确定我哪里出错了。)

简单地说,以下代码可以正常工作(正如人们所期望的那样):

schema = deepcopy(tripDF.schema)
tripRDD = tripDF.rdd
tripDFNew = sqlContext.createDataFrame(tripRDD, schema)
tripDFNew.take(1)

当我需要映射 RDD 时(例如添加字段的情况),事情就不起作用了。

schema = deepcopy(tripDF.schema)
tripRDD = tripDF.rdd
def trivial_map(row):
    rowDict = row.asDict()
    return pyspark.Row(**rowDict)
tripRDDNew = tripRDD.map(lambda row: trivial_map(row))
tripDFNew = sqlContext.createDataFrame(tripRDDNew, schema)
tripDFNew.take(1)

上面的代码给出了以下异常,其中 XXX 是整数的替代,它会随着运行而变化(例如,我见过 1、16、23 等):

File "/opt/cloudera/parcels/CDH-5.8.3-
1.cdh5.8.3.p1967.2057/lib/spark/python/pyspark/sql/types.py", line 546, in 
toInternal
raise ValueError("Unexpected tuple %r with StructType" % obj)
ValueError: Unexpected tuple XXX with StructType`

鉴于此信息,第二个代码块中是否存在明显错误? (我注意到tripRDD 属于rdd.RDD 类,而tripRDDNew 属于rdd.PipelinedRDD 类,但我认为这应该不是问题。)(我还注意到tripRDD 的架构不是按字段名排序的,而tripRDDNew 的架构按字段名称排序。同样,我不明白为什么会出现问题。)

架构:

root
 |-- foo: struct (nullable = true)
 |    |-- bar_1: integer (nullable = true)
 |    |-- bar_2: integer (nullable = true)
 |    |-- bar_3: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- baz_1: integer (nullable = true)
 |    |    |    |-- baz_2: string (nullable = true)
 |    |    |    |-- baz_3: double (nullable = true)
 |    |-- bar_4: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- baz_1: integer (nullable = true)
 |    |    |    |-- baz_2: string (nullable = true)
 |    |    |    |-- baz_3: double (nullable = true)
 |-- qux: integer (nullable = true)
 |-- corge: integer (nullable = true)
 |-- uier: integer (nullable = true)`

【问题讨论】:

【参考方案1】:

如帖子中所述,原始架构中的字段未按字母顺序排列。问题就在于此。在映射函数中使用 .asDict() 对生成的 RDD 的字段进行排序。在调用 createDataFrame 时,tripRDDNew 的字段顺序与 schema 冲突。 ValueError 是由于尝试将整数字段之一(即示例中的 qux、corge 或 uier)解析为 StructType 而导致的。

(顺便说一句:createDataFrame 要求模式字段与 RDD 字段具有相同的顺序有点令人惊讶。您应该需要一致的字段名称或一致的字段顺序,但同时要求两者似乎有点过头了。)

(顺便说一句:DataFrame中存在非字母字段有些不正常。例如,sc.parallelize()在分发数据结构时会自动按字母顺序排列字段。看起来应该对字段进行排序从 parquet 文件导入 DataFrame 时。调查为什么不是这种情况可能会很有趣。)

【讨论】:

以上是关于Pyspark RDD 到具有强制模式的 DataFrame:值错误的主要内容,如果未能解决你的问题,请参考以下文章

如何在过滤器pyspark RDD中过滤掉某种模式[重复]

当数据包含具有两种不同数据类型的嵌套数组时,在 PySpark 中定义模式

Pyspark 将 rdd 转换为具有空值的数据帧

在pySpark中将RDD拆分为n个部分

研究 RDD-pyspark 的不同元素

将 Pyspark Python k-means 模型预测插入具有原始 RDD 项和特征的 DF