如何展平 pySpark 数据框?

Posted

技术标签:

【中文标题】如何展平 pySpark 数据框?【英文标题】:How do I flattern a pySpark dataframe ? 【发布时间】:2017-03-17 16:21:37 【问题描述】:

我有一个像这样的 spark 数据框:

id |            Operation                 |        Value 
----------------------------------------------------------- 
1  | [Date_Min, Date_Max, Device]         | [148590, 148590, iphone]     
2  | [Date_Min, Date_Max, Review]         | [148590, 148590, Good]     
3  | [Date_Min, Date_Max, Review, Device] | [148590, 148590, Bad,samsung]     

我期望的结果:

id | Operation |  Value |
-------------------------- 
1  | Date_Min  | 148590 |
1  | Date_Max  | 148590 |
1  | Device    | iphone |
2  | Date_Min  | 148590 |
2  | Date_Max  | 148590 |
2  | Review    | Good   |
3  | Date_Min  | 148590 |
3  | Date_Max  | 148590 |
3  | Review    | Bad    |
3  | Review    | samsung|

我正在使用带有 pyspark 的 Spark 2.1.0。我试过这个solution ,但它只适用于一列。

谢谢

【问题讨论】:

我仍然无法找出完成这项特定任务的好方法。我尝试单独分解列df1 = df.select('id', explode(col("Operation")))df2 = df.select('id', explode(col("Value")))。但是,如何将两个数据帧水平堆叠在一起,并没有很好的解决方案。 【参考方案1】:

这是上面的示例数据框。我用这个solution 来解决你的问题。

df = spark.createDataFrame(
     [[1, ['Date_Min', 'Date_Max', 'Device'], ['148590', '148590', 'iphone']], 
      [2, ['Date_Min', 'Date_Max', 'Review'], ['148590', '148590', 'Good']],     
      [3, ['Date_Min', 'Date_Max', 'Review', 'Device'], ['148590', '148590', 'Bad', 'samsung']]], 
     schema=['id', 'l1', 'l2'])

在这里,您可以定义udf 以先将每行的两个列表压缩在一起。

from pyspark.sql.types import *
from pyspark.sql.functions import col, udf, explode

zip_list = udf(
  lambda x, y: list(zip(x, y)),
  ArrayType(StructType([
      StructField("first", StringType()),
      StructField("second", StringType())
  ]))
)

最后,您可以将两列压缩在一起,然后分解该列。

df_out = df.withColumn("tmp", zip_list('l1', 'l2')).\
    withColumn("tmp", explode("tmp")).\
    select('id', col('tmp.first').alias('Operation'), col('tmp.second').alias('Value'))
df_out.show()

输出

+---+---------+-------+
| id|Operation|  Value|
+---+---------+-------+
|  1| Date_Min| 148590|
|  1| Date_Max| 148590|
|  1|   Device| iphone|
|  2| Date_Min| 148590|
|  2| Date_Max| 148590|
|  2|   Review|   Good|
|  3| Date_Min| 148590|
|  3| Date_Max| 148590|
|  3|   Review|    Bad|
|  3|   Device|samsung|
+---+---------+-------+

【讨论】:

没问题@Omar14! 最后还是zip_list函数有问题。当我使用 Zeppelin 笔记本时,它可以工作,但是当我尝试使用 spark-submit 自动化作业和脚本时,作业失败并出现以下错误:zip argument #1 must support iteration【参考方案2】:

如果使用 DataFrame,那么试试这个:-

import pyspark.sql.functions as F

your_df.select("id", F.explode("Operation"), F.explode("Value")).show()

【讨论】:

当我同时分解 2 列时它对我不起作用。

以上是关于如何展平 pySpark 数据框?的主要内容,如果未能解决你的问题,请参考以下文章

如何使用 pyspark 在 aws 胶水中展平嵌套 json 中的数组?

如何将 JSON 格式的数据展平为 spark 数据框

Spark Python Pyspark 如何使用字典数组和嵌入式字典来展平列(sparknlp 注释器输出)

在 Pyspark 中展平 Json

使用 Azure Synapse pyspark 过滤器根据嵌套对象的数据类型展平嵌套的 json 对象

在pyspark中展平嵌套的json scala代码