PySpark:如何在列中使用 Or 进行分组

Posted

技术标签:

【中文标题】PySpark:如何在列中使用 Or 进行分组【英文标题】:PySpark: How to groupby with Or in columns 【发布时间】:2019-09-20 04:10:50 【问题描述】:

我想在 PySpark 中进行分组,但该值可以出现在多个列中,因此如果它出现在任何选定的列中,它将被分组。

例如,如果我在 Pyspark 中有这张表:

我想将每个 ID 的访问和投资相加,结果是:

请注意,ID1 是前三列之一中具有 ID1 的行 0、1、3 的总和 [ID1 访问次数 = 500 + 100 + 200 = 800]。 ID2 是行 1,2 等的总和

OBS 1:为简单起见,我的示例是一个简单的数据框,但实际上是一个更大的 df,其中包含很多行和很多变量以及其他操作,而不仅仅是“求和”。 这不能在 pandas 上工作,因为它太大了。应该在 PySpark 中

OBS2:为了说明,我在 pandas 中打印了表格,但实际上它在 PySpark 中

感谢所有帮助,并提前非常感谢您

【问题讨论】:

只有 3 列,你可以做一个联合,然后按组求和 但是我不会有想要的结果。如果我这样做,这将与多列的 groupby 相同。在这种情况下,结果将与原始数据帧相同,因为没有重复组合,例如:ID123 |总和(访问) |总和(投资) 114 | 500 | 1000 213 | 100 | 200 532 | 200 | 400 134 | 200 | 200 【参考方案1】:

首先让我们创建我们的测试数据框。

>>> import pandas as pd

>>> data = 
       "ID1": [1, 2, 5, 1],
       "ID2": [1, 1, 3, 3],
       "ID3": [4, 3, 2, 4],
       "Visits": [500, 100, 200, 200],
       "Investment": [1000, 200, 400, 200]
    
>>> df = spark.createDataFrame(pd.DataFrame(data))
>>> df.show()

+---+---+---+------+----------+
|ID1|ID2|ID3|Visits|Investment|
+---+---+---+------+----------+
|  1|  1|  4|   500|      1000|
|  2|  1|  3|   100|       200|
|  5|  3|  2|   200|       400|
|  1|  3|  4|   200|       200|
+---+---+---+------+----------+

一旦我们有了可以操作的 DataFrame,我们必须定义一个函数,该函数将返回来自列 ID1ID2ID3 的唯一 ID 列表。

>>> import pyspark.sql.functions as F
>>> from pyspark.sql.types import ArrayType, IntegerType

>>> @F.udf(returnType=ArrayType(IntegerType()))
... def ids_list(*cols):
...    return list(set(cols))

现在是时候在 DataFrame 上应用我们的 udf了。

>>> df = df.withColumn('ids', ids_list('ID1', 'ID2', 'ID3'))
>>> df.show()

+---+---+---+------+----------+---------+
|ID1|ID2|ID3|Visits|Investment|      ids|
+---+---+---+------+----------+---------+
|  1|  1|  4|   500|      1000|   [1, 4]|
|  2|  1|  3|   100|       200|[1, 2, 3]|
|  5|  3|  2|   200|       400|[2, 3, 5]|
|  1|  3|  4|   200|       200|[1, 3, 4]|
+---+---+---+------+----------+---------+

要使用ids 列,我们必须将其分解为单独的行并删除ids 列。

>>> df = df.withColumn("ID", F.explode('ids')).drop('ids')
>>> df.show()

+---+---+---+------+----------+---+
|ID1|ID2|ID3|Visits|Investment| ID|
+---+---+---+------+----------+---+
|  1|  1|  4|   500|      1000|  1|
|  1|  1|  4|   500|      1000|  4|
|  2|  1|  3|   100|       200|  1|
|  2|  1|  3|   100|       200|  2|
|  2|  1|  3|   100|       200|  3|
|  5|  3|  2|   200|       400|  2|
|  5|  3|  2|   200|       400|  3|
|  5|  3|  2|   200|       400|  5|
|  1|  3|  4|   200|       200|  1|
|  1|  3|  4|   200|       200|  3|
|  1|  3|  4|   200|       200|  4|
+---+---+---+------+----------+---+

最后,我们必须按 ID 列对 DataFrame 进行分组并计算总和。最终结果按ID排序。

>>> final_df = (
...    df.groupBy('ID')
...       .agg( F.sum('Visits'), F.sum('Investment') )
...       .orderBy('ID')
... )
>>> final_df.show()

+---+-----------+---------------+
| ID|sum(Visits)|sum(Investment)|
+---+-----------+---------------+
|  1|        800|           1400|
|  2|        300|            600|
|  3|        500|            800|
|  4|        700|           1200|
|  5|        200|            400|
+---+-----------+---------------+

希望对你有用。

【讨论】:

【参考方案2】:

您可以执行以下操作:

    在所有id 列中创建array-> ids 下面的列 explodeids专栏 现在你会得到重复,以避免重复聚合使用distinct 最后groupBy ids 列并执行所有聚合

注意::如果您的数据集可以有完全重复的行,则在创建数组之前添加一列 df.withColumn('uid', f.monotonically_increasing_id()),否则 distinct 将删除它。

数据集示例:

import pyspark.sql.functions as f

df.withColumn('ids', f.explode(f.array('id1','id2','id3'))).distinct().groupBy('ids').agg(f.sum('visits'), f.sum('investments')).orderBy('ids').show()
+---+-----------+----------------+
|ids|sum(visits)|sum(investments)|
+---+-----------+----------------+
|  1|        800|            1400|
|  2|        300|             600|
|  3|        500|             800|
|  4|        700|            1200|
|  5|        200|             400|
+---+-----------+----------------+

【讨论】:

以上是关于PySpark:如何在列中使用 Or 进行分组的主要内容,如果未能解决你的问题,请参考以下文章

计算 PySpark SQL Join 中每个不同值在列中出现的次数

如何使用模式列表在列中查找字符串并将匹配的模式添加到下一列的同一行

如何在 PySpark 中进行分组并查找列的唯一项目 [重复]

如何使用flexbox进行两列布局,并在列中的项目之间使用相同的间距? [重复]

pyspark 在列上应用函数

当值在sql中重复时如何对列进行分组