pyspark 行列表的 RDD 到 DataFrame

Posted

技术标签:

【中文标题】pyspark 行列表的 RDD 到 DataFrame【英文标题】:RDD of pyspark Row lists to DataFrame 【发布时间】:2017-08-15 18:36:43 【问题描述】:

我有一个 RDD,它的分区包含可以轻松转换为行列表的元素(碰巧是熊猫数据帧)。把它看成是看起来像这样 P>

rows_list = []
for word in 'quick brown fox'.split():
    rows = []
    for i,c in enumerate(word):
        x = ord(c) + i
        row = pyspark.sql.Row(letter=c, number=i, importance=x)
        rows.append(row)
    rows_list.append(rows)
rdd = sc.parallelize(rows_list)
rdd.take(2)

给了

[[Row(importance=113, letter='q', number=0),
  Row(importance=118, letter='u', number=1),
  Row(importance=107, letter='i', number=2),
  Row(importance=102, letter='c', number=3),
  Row(importance=111, letter='k', number=4)],
 [Row(importance=98, letter='b', number=0),
  Row(importance=115, letter='r', number=1),
  Row(importance=113, letter='o', number=2),
  Row(importance=122, letter='w', number=3),
  Row(importance=114, letter='n', number=4)]]

欲把它变成一个火花数据帧。我希望我可以只执行 P>

rdd.toDF()

但是,给出了一个无用结构 P>

DataFrame[_1: struct<importance:bigint,letter:string,number:bigint>,
          _2: struct<importance:bigint,letter:string,number:bigint>,
          _3: struct<importance:bigint,letter:string,number:bigint>, 
          _4: struct<importance:bigint,letter:string,number:bigint>, 
          _5: struct<importance:bigint,letter:string,number:bigint>]

我真正想要的是一个 3 列的 DataFrame,比如这个

desired_df = sql_context.createDataFrame(sum(rows_list, []))

,这样我可以执行像操作 P>

desired_df.agg(pyspark.sql.functions.sum('number')).take(1)

和得到答案23。 P>

什么是去了解这个正确的方式? P>

【问题讨论】:

【参考方案1】:

你有一个 RDD 的行列表,而你需要 RDD 的行;您可以将rddflatMap 展平,然后将其转换为数据框:

rdd.flatMap(lambda x: x).toDF().show()

+----------+------+------+
|importance|letter|number|
+----------+------+------+
|       113|     q|     0|
|       118|     u|     1|
|       107|     i|     2|
|       102|     c|     3|
|       111|     k|     4|
|        98|     b|     0|
|       115|     r|     1|
|       113|     o|     2|
|       122|     w|     3|
|       114|     n|     4|
|       102|     f|     0|
|       112|     o|     1|
|       122|     x|     2|
+----------+------+------+

import pyspark.sql.functions as F

rdd.flatMap(lambda x: x).toDF().agg(F.sum('number')).show()
+-----------+
|sum(number)|
+-----------+
|         23|
+-----------+

【讨论】:

以上是关于pyspark 行列表的 RDD 到 DataFrame的主要内容,如果未能解决你的问题,请参考以下文章

Pyspark RDD 收集前 163 行

将 RDD 行拆分到 Pyspark 中的不同列

如何从 PySpark DataFrame 中获取随机行?

pyspark:来自rdd的包含列表列表的数据框

PySpark:从数据框列表创建 RDD

如何在pyspark中将rdd行转换为带有json结构的数据框?