如何展平 pySpark 数据框?
Posted
技术标签:
【中文标题】如何展平 pySpark 数据框?【英文标题】:How do I flattern a pySpark dataframe ? 【发布时间】:2017-03-17 16:21:37 【问题描述】:我有一个像这样的 spark 数据框:
id | Operation | Value
-----------------------------------------------------------
1 | [Date_Min, Date_Max, Device] | [148590, 148590, iphone]
2 | [Date_Min, Date_Max, Review] | [148590, 148590, Good]
3 | [Date_Min, Date_Max, Review, Device] | [148590, 148590, Bad,samsung]
我期望的结果:
id | Operation | Value |
--------------------------
1 | Date_Min | 148590 |
1 | Date_Max | 148590 |
1 | Device | iphone |
2 | Date_Min | 148590 |
2 | Date_Max | 148590 |
2 | Review | Good |
3 | Date_Min | 148590 |
3 | Date_Max | 148590 |
3 | Review | Bad |
3 | Review | samsung|
我正在使用带有 pyspark 的 Spark 2.1.0。我试过这个solution ,但它只适用于一列。
谢谢
【问题讨论】:
我仍然无法找出完成这项特定任务的好方法。我尝试单独分解列df1 = df.select('id', explode(col("Operation")))
、df2 = df.select('id', explode(col("Value")))
。但是,如何将两个数据帧水平堆叠在一起,并没有很好的解决方案。
【参考方案1】:
这是上面的示例数据框。我用这个solution 来解决你的问题。
df = spark.createDataFrame(
[[1, ['Date_Min', 'Date_Max', 'Device'], ['148590', '148590', 'iphone']],
[2, ['Date_Min', 'Date_Max', 'Review'], ['148590', '148590', 'Good']],
[3, ['Date_Min', 'Date_Max', 'Review', 'Device'], ['148590', '148590', 'Bad', 'samsung']]],
schema=['id', 'l1', 'l2'])
在这里,您可以定义udf
以先将每行的两个列表压缩在一起。
from pyspark.sql.types import *
from pyspark.sql.functions import col, udf, explode
zip_list = udf(
lambda x, y: list(zip(x, y)),
ArrayType(StructType([
StructField("first", StringType()),
StructField("second", StringType())
]))
)
最后,您可以将两列压缩在一起,然后分解该列。
df_out = df.withColumn("tmp", zip_list('l1', 'l2')).\
withColumn("tmp", explode("tmp")).\
select('id', col('tmp.first').alias('Operation'), col('tmp.second').alias('Value'))
df_out.show()
输出
+---+---------+-------+
| id|Operation| Value|
+---+---------+-------+
| 1| Date_Min| 148590|
| 1| Date_Max| 148590|
| 1| Device| iphone|
| 2| Date_Min| 148590|
| 2| Date_Max| 148590|
| 2| Review| Good|
| 3| Date_Min| 148590|
| 3| Date_Max| 148590|
| 3| Review| Bad|
| 3| Device|samsung|
+---+---------+-------+
【讨论】:
没问题@Omar14! 最后还是zip_list函数有问题。当我使用 Zeppelin 笔记本时,它可以工作,但是当我尝试使用 spark-submit 自动化作业和脚本时,作业失败并出现以下错误:zip argument #1 must support iteration
【参考方案2】:
如果使用 DataFrame,那么试试这个:-
import pyspark.sql.functions as F
your_df.select("id", F.explode("Operation"), F.explode("Value")).show()
【讨论】:
当我同时分解 2 列时它对我不起作用。以上是关于如何展平 pySpark 数据框?的主要内容,如果未能解决你的问题,请参考以下文章
如何使用 pyspark 在 aws 胶水中展平嵌套 json 中的数组?
Spark Python Pyspark 如何使用字典数组和嵌入式字典来展平列(sparknlp 注释器输出)