PySpark 聚合和分组依据

Posted

技术标签:

【中文标题】PySpark 聚合和分组依据【英文标题】:PySpark Aggregation and Group By 【发布时间】:2018-06-26 09:40:51 【问题描述】:

我看过多个帖子,但聚合是在多个列上完成的,但我想要基于 col OPTION_CD 的聚合,基于以下 条件: 如果有附加到数据框查询的条件,这会给我错误 'DataFrame' object has no attribute '_get_object_id'

IF NULL(STRING AGG(OPTION_CD,'' order by OPTION_CD),'')。 我能理解的是,如果 OPTION_CD col 为空,则放置一个空白,否则将 OPTION_CD 附加在由空白分隔的一行中。以下是示例表:

首先过滤从 COl 1 中只得到 1 和 2,那么结果应该是这样的:

以下是我在数据框上编写的查询

df_result = df.filter((df.COL1 == 1)|(df.COL1 == 2)).select(df.COL1,df.COL2,(when(df.OPTION_CD == "NULL", " ").otherwise(df.groupBy(df.OPTION_CD))).agg( collect_list(df.OPTION_CD)))

但没有得到想要的结果。有人可以帮忙吗?我正在使用 pyspark。

【问题讨论】:

请以适当的格式编写您的代码,而不仅仅是部分。这没有什么意义,因为您似乎尝试在同一列上进行分组和聚合(option_cd) 也许你想用 Col1 Col2 分组并在 option_cd 列上用 c​​ollect_list 聚合? 我已更新代码供您参考。是的@Michail,如果 COL1 和 COL2 相同,那么 option_cd 值必须附加一个空格,否则没有变化 您希望最终数据框中的选项列是什么数据类型?因为您似乎想要混合来自聚合和字符串的列表。同样在第一个数据框中,您有空格或 Null 值? SPARK SQL replacement for mysql GROUP_CONCAT aggregate function的可能重复 【参考方案1】:

你的问题表达得不够清楚,但我会尽力回答。

您需要了解数据框列的所有行只能具有一种数据类型。如果您的初始数据是整数,那么您不能使用空字符串检查字符串是否相等,而是使用 Null 值。

另外,collect list 返回一个整数数组,因此您不能在一行中包含 [7 , 5] 而在另一行中包含“'”。无论如何,这对你有用吗?

from pyspark.sql.functions import col, collect_list

listOfTuples = [(1, 3, 1),(2, 3, 2),(1, 4, 5),(1, 4, 7),(5, 5, 8),(4, 1, 3),(2,4,None)]
df = spark.createDataFrame(listOfTuples , ["A", "B", "option"])
df.show()
>>>
+---+---+------+
|  A|  B|option|
+---+---+------+
|  1|  3|     1|
|  2|  3|     2|
|  1|  4|     5|
|  1|  4|     7|
|  5|  5|     8|
|  4|  1|     3|
|  2|  4|  null|
+---+---+------+


dfFinal = df.filter((df.A == 1)|(df.A == 2)).groupby(['A','B']).agg(collect_list(df['option']))
dfFinal.show()
>>>   
+---+---+--------------------+
|  A|  B|collect_list(option)|
+---+---+--------------------+
|  1|  3|                 [1]|
|  1|  4|              [5, 7]|
|  2|  3|                 [2]|
|  2|  4|                  []|
+---+---+--------------------+

【讨论】:

以上是关于PySpark 聚合和分组依据的主要内容,如果未能解决你的问题,请参考以下文章

如何在 pyspark 中对需要在聚合中聚合的分组数据应用窗口函数?

在 PySpark Dataframe 中结合旋转和分组聚合

如何通过不同级别的枢轴聚合然后在pyspark中进行内部连接?

Pyspark - 一次聚合数据框的所有列[重复]

(Pyspark - 在一段时间内按用户分组

pyspark 中的每月聚合