PySpark:如何在列中使用 Or 进行分组
Posted
技术标签:
【中文标题】PySpark:如何在列中使用 Or 进行分组【英文标题】:PySpark: How to groupby with Or in columns 【发布时间】:2019-09-20 04:10:50 【问题描述】:我想在 PySpark 中进行分组,但该值可以出现在多个列中,因此如果它出现在任何选定的列中,它将被分组。
例如,如果我在 Pyspark 中有这张表:
我想将每个 ID 的访问和投资相加,结果是:
请注意,ID1 是前三列之一中具有 ID1 的行 0、1、3 的总和 [ID1 访问次数 = 500 + 100 + 200 = 800]。 ID2 是行 1,2 等的总和
OBS 1:为简单起见,我的示例是一个简单的数据框,但实际上是一个更大的 df,其中包含很多行和很多变量以及其他操作,而不仅仅是“求和”。 这不能在 pandas 上工作,因为它太大了。应该在 PySpark 中
OBS2:为了说明,我在 pandas 中打印了表格,但实际上它在 PySpark 中
感谢所有帮助,并提前非常感谢您
【问题讨论】:
只有 3 列,你可以做一个联合,然后按组求和 但是我不会有想要的结果。如果我这样做,这将与多列的 groupby 相同。在这种情况下,结果将与原始数据帧相同,因为没有重复组合,例如:ID123 |总和(访问) |总和(投资) 114 | 500 | 1000 213 | 100 | 200 532 | 200 | 400 134 | 200 | 200 【参考方案1】:首先让我们创建我们的测试数据框。
>>> import pandas as pd
>>> data =
"ID1": [1, 2, 5, 1],
"ID2": [1, 1, 3, 3],
"ID3": [4, 3, 2, 4],
"Visits": [500, 100, 200, 200],
"Investment": [1000, 200, 400, 200]
>>> df = spark.createDataFrame(pd.DataFrame(data))
>>> df.show()
+---+---+---+------+----------+
|ID1|ID2|ID3|Visits|Investment|
+---+---+---+------+----------+
| 1| 1| 4| 500| 1000|
| 2| 1| 3| 100| 200|
| 5| 3| 2| 200| 400|
| 1| 3| 4| 200| 200|
+---+---+---+------+----------+
一旦我们有了可以操作的 DataFrame,我们必须定义一个函数,该函数将返回来自列 ID1
、ID2
和 ID3
的唯一 ID 列表。
>>> import pyspark.sql.functions as F
>>> from pyspark.sql.types import ArrayType, IntegerType
>>> @F.udf(returnType=ArrayType(IntegerType()))
... def ids_list(*cols):
... return list(set(cols))
现在是时候在 DataFrame 上应用我们的 udf
了。
>>> df = df.withColumn('ids', ids_list('ID1', 'ID2', 'ID3'))
>>> df.show()
+---+---+---+------+----------+---------+
|ID1|ID2|ID3|Visits|Investment| ids|
+---+---+---+------+----------+---------+
| 1| 1| 4| 500| 1000| [1, 4]|
| 2| 1| 3| 100| 200|[1, 2, 3]|
| 5| 3| 2| 200| 400|[2, 3, 5]|
| 1| 3| 4| 200| 200|[1, 3, 4]|
+---+---+---+------+----------+---------+
要使用ids
列,我们必须将其分解为单独的行并删除ids
列。
>>> df = df.withColumn("ID", F.explode('ids')).drop('ids')
>>> df.show()
+---+---+---+------+----------+---+
|ID1|ID2|ID3|Visits|Investment| ID|
+---+---+---+------+----------+---+
| 1| 1| 4| 500| 1000| 1|
| 1| 1| 4| 500| 1000| 4|
| 2| 1| 3| 100| 200| 1|
| 2| 1| 3| 100| 200| 2|
| 2| 1| 3| 100| 200| 3|
| 5| 3| 2| 200| 400| 2|
| 5| 3| 2| 200| 400| 3|
| 5| 3| 2| 200| 400| 5|
| 1| 3| 4| 200| 200| 1|
| 1| 3| 4| 200| 200| 3|
| 1| 3| 4| 200| 200| 4|
+---+---+---+------+----------+---+
最后,我们必须按 ID
列对 DataFrame 进行分组并计算总和。最终结果按ID
排序。
>>> final_df = (
... df.groupBy('ID')
... .agg( F.sum('Visits'), F.sum('Investment') )
... .orderBy('ID')
... )
>>> final_df.show()
+---+-----------+---------------+
| ID|sum(Visits)|sum(Investment)|
+---+-----------+---------------+
| 1| 800| 1400|
| 2| 300| 600|
| 3| 500| 800|
| 4| 700| 1200|
| 5| 200| 400|
+---+-----------+---------------+
希望对你有用。
【讨论】:
【参考方案2】:您可以执行以下操作:
-
在所有
id
列中创建array
-> ids
下面的列
explode
ids
专栏
现在你会得到重复,以避免重复聚合使用distinct
最后groupBy
ids
列并执行所有聚合
注意::如果您的数据集可以有完全重复的行,则在创建数组之前添加一列 df.withColumn('uid', f.monotonically_increasing_id())
,否则 distinct
将删除它。
数据集示例:
import pyspark.sql.functions as f
df.withColumn('ids', f.explode(f.array('id1','id2','id3'))).distinct().groupBy('ids').agg(f.sum('visits'), f.sum('investments')).orderBy('ids').show()
+---+-----------+----------------+
|ids|sum(visits)|sum(investments)|
+---+-----------+----------------+
| 1| 800| 1400|
| 2| 300| 600|
| 3| 500| 800|
| 4| 700| 1200|
| 5| 200| 400|
+---+-----------+----------------+
【讨论】:
以上是关于PySpark:如何在列中使用 Or 进行分组的主要内容,如果未能解决你的问题,请参考以下文章
计算 PySpark SQL Join 中每个不同值在列中出现的次数
如何使用模式列表在列中查找字符串并将匹配的模式添加到下一列的同一行
如何在 PySpark 中进行分组并查找列的唯一项目 [重复]