Spark - 使用 groupBy 减少组合数量
Posted
技术标签:
【中文标题】Spark - 使用 groupBy 减少组合数量【英文标题】:Spark - Reducing number of combinations using a groupBy 【发布时间】:2016-02-13 15:50:06 【问题描述】:假设我有一个如下形式的数据集:
data = sc.parallelize([('customer_1', 'contract_1', 15000, 100),
('customer_1', 'contract_1', 20000, 200),
('customer_2', 'contract_2', 30000, 100),
('customer_1', 'contract_1', 7500, 500)], 2)
其中:
第一列代表客户 ID。 第二列代表合同 ID。 第三列代表时间戳。 第四列是合同价值。我需要做的是添加一个额外的列,对于每一行,包含具有相同客户 ID、相同合同 ID 且时间戳等于或大于当前时间戳的所有行的合同价值总和行。
所以,对于之前的数据集,结果应该是:
customer_1 contract_1 15000 300 # 300 = 100+200
customer_1 contract_1 20000 200 # 200
customer_2 contract_2 30000 100 # 100
customer_1 contract_1 7500 800 # 800 = 100+200+500
如果不存在时间戳检查,则可以设置由客户 ID 和合同 ID 组成的密钥,通过密钥减少,然后加入,但鉴于存在时间戳比较,我找不到简单的方法这样做。
我完成这项工作的第一种方法是以这种方式使用笛卡尔运算:
combinations = data.cartesian(data)
.filter(lambda a: a[0][0] == a[1][0] and
a[0][1] == a[1][1] and
a[1][2] >= a[0][2])
agg = combinations.map(lambda a: (a[0], a[1][3])).reduceByKey(lambda x,y: x+y)
结果还可以,但恐怕将笛卡尔应用于我正在管理的数据量(超过 100 万行)效率很低。事实上,在这里应用笛卡尔运算会产生许多根本没有意义的组合(根据定义,组合不同客户或合同的行是没有意义的),这些组合随后会被过滤器删除。
对我来说,理想的情况是使用客户 ID 和合同 ID 作为键执行 groupBy
,然后遍历生成的 groupBy
,并对每一行应用笛卡尔积。这将大大减少生成的组合数量。但是,我没有找到任何方法来做到这一点。更何况,这可能吗?如果是这样,如何?对于如何实现我的要求,您还有其他建议/想法吗?
感谢您的帮助!
【问题讨论】:
【参考方案1】:这是一个要求窗口函数的问题:
import sys
from pyspark.sql.window import Window
from pyspark.sql.functions import sum
df = data.toDF(["customer_id", "contract_id", "timestamp", "value"])
w = (Window()
.partitionBy("customer_id", "contract_id")
.orderBy("timestamp")
# Current row and future values
.rangeBetween(0, sys.maxsize)) # or .rowsBetween(0, sys.maxsize)
result = df.withColumn("future_value", sum("value").over(w))
result.show()
## +-----------+-----------+---------+-----+------------+
## |customer_id|contract_id|timestamp|value|future_value|
## +-----------+-----------+---------+-----+------------+
## | customer_1| contract_1| 7500| 500| 800|
## | customer_1| contract_1| 15000| 100| 300|
## | customer_1| contract_1| 20000| 200| 200|
## | customer_2| contract_2| 30000| 100| 100|
## +-----------+-----------+---------+-----+------------+
【讨论】:
非常感谢 zero323。我刚开始使用 Spark 并且不了解 Window 函数。谢谢你提供的详情。出于好奇,再问一个问题:如果我有一个 timestamp_start 和 timestamp_end 字段,并且我总结的条件是 current_row_timestamp_start >= timestamp_start 和 current_row_timestamp_end 没有。您可以提供覆盖行或值的静态范围,但它不能依赖于当前行。从理论上讲,您可以在滞后/领先的情况下向后和向前看,并尝试从中构建一些东西,但它不太可能是漂亮或高效的。以上是关于Spark - 使用 groupBy 减少组合数量的主要内容,如果未能解决你的问题,请参考以下文章
Spark groupBy vs repartition plus mapPartitions