pyspark:来自rdd的包含列表列表的数据框

Posted

技术标签:

【中文标题】pyspark:来自rdd的包含列表列表的数据框【英文标题】:pyspark: dataframe from rdd containing list of lists 【发布时间】:2021-07-16 04:07:36 【问题描述】:

我是 Spark(使用 Python)的新手,即使在查看相关帖子后也无法弄清楚。

我有一个 RDD。 RDD 的每条记录都是一个列表列表,如下所示

[[1073914607, 0, -1],[1073914607, 2, 7.88],[1073914607, 0, -1],[1073914607, 4, 40.0]]
[[1074079003, 0, -1],[1074079003, 2, 2.87],[1074079003, 0, -1],[1074079003, 4, 35.2]]

我想将 RDD 转换为具有 3 列的数据框,基本上是堆叠所有元素列表。数据框应如下所示。

account_id  product_id  price
1073914607    0         -1
1073914607    2         7.88
1073914607    0         -1
1073914607    4         40
1074079003    0         -1
1074079003    2         2.87
1074079003    0         -1
1074079003    4         35.2

我试过my_rdd.toDF(),但它给了我两行四列,每个元素列表在一列中。我还尝试了其他帖子中可能相关的一些解决方案。由于我对火花很陌生,因此我遇到了各种错误,我可以弄清楚。请帮忙。谢谢。

添加于 2021 年 7 月 28 日。最后,我执行以下操作来遍历每个元素并生成一个长列表并将其转换为数据框。可能这不是最有效的方法,但它解决了我的问题。

    result_lst=[]
    
    for x in my_rdd.toLocalIterator():
        for y in x:
            result_lst.append(y)
    
    result_df=spark.createDataFrame(result_lst, ['account_id','product_id','price'])

【问题讨论】:

请不要发布图片。人们很难复制和重现您的问题。 【参考方案1】:
>>> data = ([[1,2],[1,4]],[[2,5],[2,6]])
>>> df = sc.parallelize(data).toDF(['c1','c2'])

>>> df.show()
+------+------+
|    c1|    c2|
+------+------+
|[1, 2]|[1, 4]|
|[2, 5]|[2, 6]|
+------+------+

>>> df1 = df.select(df.c1.alias('c3')).union(df.select(df.c2).alias('c3'))
>>> df1.show()
+------+
|    c3|
+------+
|[1, 2]|
|[2, 5]|
|[1, 4]|
|[2, 6]|
+------+

>>> df1.select(df1.c3,df1.c3[0],df1.c3[1]).show()
+------+-----+-----+
|    c3|c3[0]|c3[1]|
+------+-----+-----+
|[1, 2]|    1|    2|
|[2, 5]|    2|    5|
|[1, 4]|    1|    4|
|[2, 6]|    2|    6|
+------+-----+-----+

【讨论】:

谢谢巴拉。但我希望第一行为 [1, 2] 和第三行为 [2, 5] 而不是 [1, 1] 和 [2, 2] 的结果。另外,大列表中元素列表的计数是动态的,我不知道我是否只有'c1'和'c2'。例如,它可能有“c1”到“c90”。另外,在元素列表中会有三个值,不确定是否会跨栏。 @sguo:答案已更新。这应该给出一些方向 谢谢@Bala。最后,我做了以下循环遍历每个元素并生成一个长列表并将其转换为数据框。 result_lst=[] for x in my_rdd.toLocalIterator(): for y in x: result_lst.append(y) result_df=spark.createDataFrame(result_lst, ['account_id','tip_id'])【参考方案2】:

我后来使用下面的另一种方法来解决问题,而不是将 rdd 带到 Localiterator() 并循环它。我想这种新方法效率更高。

from pyspark.sql.functions import explode
from pyspark.sql import Row
df_exploded=my_rdd.map(lambda x : Row(x)).toDF().withColumn('_1', explode('_1'))
result_df=df_exploded.select([df_exploded._1[i] for i in range(3)]).toDF('account_id','product_id','price')

【讨论】:

以上是关于pyspark:来自rdd的包含列表列表的数据框的主要内容,如果未能解决你的问题,请参考以下文章

PySpark:从数据框列表创建 RDD

Pyspark:从列表的 RDD 创建一个火花数据框,其中列表的某些元素是对象

pyspark 行列表的 RDD 到 DataFrame

怎么解决 ? (将列表添加到列数据框pyspark)

pyspark将列添加到列表中已经不存在的数据框

为每组 pyspark RDD/dataframe 选择随机列