Groupby 和 collect_list 基于 PySpark 中的另一列维护顺序
Posted
技术标签:
【中文标题】Groupby 和 collect_list 基于 PySpark 中的另一列维护顺序【英文标题】:Groupby and collect_list maintaining order based on another column in PySpark 【发布时间】:2020-01-17 07:04:15 【问题描述】:我有一个像这样的 PySpark 数据框,
+----------+------------+------------+------------+
| Name | dateCol1 | dateCol2 | dateCol3 |
+----------+------------+------------+------------+
| user1 | 2018-01-01 | 2018-01-10 | 2018-01-01 |
| user1 | 2018-01-11 | 2018-01-20 | 2018-01-01 |
| user2 | 2018-01-11 | 2018-01-20 | 2018-01-11 |
| user1 | 2019-01-21 | 2018-01-30 | 2018-01-01 |
+----------+------------+------------+------------+
我想根据键、dateCol1 和 dateCol2 以及列名称上的 collect_list 对这个数据集进行分组。为此,我正在使用代码,
spark_df.groupBy('dateCol1', 'dateCol2').agg(F.collect_list('Name'))
在收集要列出的列时,我还想根据列 dateCol3 维护值的顺序。
例如,我想确保对于dateCol1 == '2018-01-11'
和dateCol2 == '2018-01-20'
,收集到列表我将始终得到[user1, user2]
(基于dateCol3 的顺序)。
数据帧所需的输出是,
+------------+------------+----------------+
| dateCol1 | dateCol2 | List |
+------------+------------+----------------+
| 2018-01-01 | 2018-01-10 | [user1] |
| 2018-01-11 | 2018-01-20 | [user1, user2] |
| 2019-01-21 | 2018-01-30 | [user1] |
+------------+------------+----------------+
collect_list 默认情况下不会保持顺序,如何确保收集的列表是基于数据框中的另一个外部列排序的?
【问题讨论】:
@YOLO,我想在collect_list基于dateCol3创建列表时保持顺序。根据 dateCol3 然后 collect_list 对数据框进行排序不一致。 【参考方案1】:你可以试试:
spark_df.orderBy('dateCol3', ascending=True).groupBy('dateCol1', 'dateCol2').agg(F.collect_list('Name'))
或者,虽然有点矫枉过正,但您可以使用窗口化:
from pyspark.sql import Window as w
spark_df.select('dateCol1', 'dateCol2', F.collect_list('Name').over(w.partitionBy(['dateCol1','dateCol2']).orderBy(F.col('dateCol3'))).alias('Name')).distinct()
【讨论】:
我正在寻找基于窗口的方法。我猜基于 dateCol3 的分区和排序将确保按顺序收集列表。不是吗? 是的,正是这个想法。 解决方案几乎是正确的,但获得了额外的行。 distinct 不会删除某些行。我想知道我是否可以在 groupBy 中对 collect_list 进行分区和排序。否则,我必须再次使用窗口并 dropDuplicates 保持顺序的键 是的,你可以把它放在.groupBy(...)
之后.select(...)
- 这也可以。但是我认为 distinct()
应该为您做,除非您有更多的列要添加到那里和/或更多的转换以上是关于Groupby 和 collect_list 基于 PySpark 中的另一列维护顺序的主要内容,如果未能解决你的问题,请参考以下文章
Hive---collect_list和collect_set
在 Spark SQL 中使用 collect_list 和 collect_set