在pyspark数据框中根据group by连接行值

Posted

技术标签:

【中文标题】在pyspark数据框中根据group by连接行值【英文标题】:Concatenate row values based on group by in pyspark data frame 【发布时间】:2021-05-03 22:19:29 【问题描述】:

我在pyspark 中有一个数据框,如下所示

df = spark.createDataFrame([('123', '2021-01-01', 1815, 9876), 
('123', '2021-01-01', 1820, 9877) , 
('123', '2021-01-01', 1828, 9878) , 
('123', '2021-02-01', 1815, 9876) , 
('123', '2021-02-01', 1820, 9877) , 
('123', '2021-02-01', 1828, 9878) , 
('223', '2021-01-01', 1815, 9876) , 
('223', '2021-01-01', 1820, 9877) , 
('223', '2021-01-01', 1828, 9878) , 
('223', '2021-02-01', 1815, 9876) , 
('223', '2021-02-01', 1820, 9877) , 
('223', '2021-02-01', 1828, 9878)],['number','date', 'sorter', 'key'])


df.show()
+------+----------+------+----+
|number|      date|sorter| key|
+------+----------+------+----+
|   123|2021-01-01|  1815|9876|
|   123|2021-01-01|  1820|9877|
|   123|2021-01-01|  1828|9878|
|   123|2021-02-01|  1815|9876|
|   123|2021-02-01|  1820|9877|
|   123|2021-02-01|  1828|9878|
|   223|2021-01-01|  1815|9876|
|   223|2021-01-01|  1820|9877|
|   223|2021-01-01|  1828|9878|
|   223|2021-02-01|  1815|9876|
|   223|2021-02-01|  1820|9877|
|   223|2021-02-01|  1828|9878|
+------+----------+------+----+

此数据框根据sorter 列排序

现在使用上面的数据框我想创建一个新的数据框。基于以下

1) For each group where number and date is same I want to concatenate the `key` value.
2) In each group the first record will have its own `key` as `joined_key`
3) From second record onwards it should have its own `key` and the `joined_key` of previous record

expected result

df1.show()
+------+----------+------+----+---------------+
|number|      date|sorter| key|     Joined_key|
+------+----------+------+----+---------------+
|   123|2021-01-01|  1815|9876|           9876|
|   123|2021-01-01|  1820|9877|      9877~9876|
|   123|2021-01-01|  1828|9878| 9878~9877~9876|
|   123|2021-02-01|  1815|9876|           9876|
|   123|2021-02-01|  1820|9877|      9877~9876|
|   123|2021-02-01|  1828|9878| 9878~9877~9876|
|   223|2021-01-01|  1815|9876|           9876|
|   223|2021-01-01|  1820|9877|      9877~9876|
|   223|2021-01-01|  1828|9878| 9878~9877~9876|
|   223|2021-02-01|  1815|9876|           9876|
|   223|2021-02-01|  1820|9877|      9877~9876|
|   223|2021-02-01|  1828|9878| 9878~9877~9876|
+------+----------+------+----+---------------+

我已经尝试过如下方法,但无法继续进行

df1 = df.groupby("number", "date").agg(collect_list('key').alias('joined_key'))
df1.show()
+------+----------+------------------+
|number|      date|        joined_key|
+------+----------+------------------+
|   223|2021-02-01|[9878, 9876, 9877]|
|   123|2021-01-01|[9878, 9876, 9877]|
|   223|2021-01-01|[9878, 9876, 9877]|
|   123|2021-02-01|[9876, 9877, 9878]|
+------+----------+------------------+

我怎样才能实现我想要的?

【问题讨论】:

【参考方案1】:

您可以使用Window 函数进行一些聚合,如下所示

window = Window.partitionBy("number", "date").orderBy("sorter")

df.withColumn("Joined_key", array_join(reverse(collect_list("key").over(window)), "~")) \
.show(truncate=False)

结果:

+------+----------+------+----+--------------+
|number|date      |sorter|key |Joined_key    |
+------+----------+------+----+--------------+
|223   |2021-02-01|1815  |9876|9876          |
|223   |2021-02-01|1820  |9877|9877~9876     |
|223   |2021-02-01|1828  |9878|9878~9877~9876|
|123   |2021-01-01|1815  |9876|9876          |
|123   |2021-01-01|1820  |9877|9877~9876     |
|123   |2021-01-01|1828  |9878|9878~9877~9876|
|223   |2021-01-01|1815  |9876|9876          |
|223   |2021-01-01|1820  |9877|9877~9876     |
|223   |2021-01-01|1828  |9878|9878~9877~9876|
|123   |2021-02-01|1815  |9876|9876          |
|123   |2021-02-01|1820  |9877|9877~9876     |
|123   |2021-02-01|1828  |9878|9878~9877~9876|
+------+----------+------+----+--------------+

【讨论】:

以上是关于在pyspark数据框中根据group by连接行值的主要内容,如果未能解决你的问题,请参考以下文章

使用数据框在pyspark中获取列post group by

如何在 pyspark 数据帧上应用 group by 并对结果对象进行转换

Pandas 数据框中的 MultiIndex Group By

根据 group by 查询 LAST N 行

在 PySpark 中反转 Group By

向 pyspark 数据框添加行索引(并排添加新列/连接数据框)