pyspark如何在窗口内聚合
Posted
技术标签:
【中文标题】pyspark如何在窗口内聚合【英文标题】:pyspark how to aggregate inside of a window 【发布时间】:2021-08-11 16:22:58 【问题描述】:我有一个具有以下架构的数据框
epoch
- long(事件的纪元,四舍五入到最接近的分钟)
client_id
- 字符串(自我解释)
volume
- long(该分钟内发生的事件数)
我想添加以下列(称为prev-1h-3m-interval-median
) - 由client_id
分区,我想每分钟查看前60分钟,连续3分钟(0-3、3-6、.. . 57-60) 并得到总和的中位数。
EDIT - 为单个 client_id
添加示例
epoch | volume |
---|---|
1 | 30 |
2 | 77 |
3 | 73 |
4 | 57 |
5 | 6 |
6 | 37 |
7 | 75 |
8 | 44 |
9 | 50 |
10 | 65 |
11 | 97 |
12 | 84 |
13 | 18 |
14 | 19 |
15 | 71 |
16 | 46 |
17 | 88 |
18 | 12 |
19 | 24 |
20 | 35 |
这个例子可以通过以下代码创建:
data = [(1, 30), (2,77), (3,73), (4,57), (5,6), (6,37), (7,75), (8,44), (9,50), (10,65),
(11,97), (12,84), (13,18), (14,19), (15,71), (16,46), (17,88), (18,12), (19,24), (20,35)]
schema = ['epoch', 'volume']
df = spark.createDataFrame(data, schema=schema)
此示例包含的数据点较少,但逻辑相同。
在这里,我想创建一个名为 prev-6-2-interval-median
的列,其中我们想要前 6 行中每 2 个连续行的总和的中位数。时期 1-6 将有 0 个值,因为它们没有前 6 行....
对于 epoch 7,计算将是 median((30+77), (73+57), (6+37)) = median(107, 130, 43) = 107
对于 epoch 8,计算将是 median((77+73), (57+6), (37+75)) = median(150, 63, 112) = 112
等等。
所以我试图实现的输出就像这样
(抱歉图片 - 内联表格格式效果不佳)
我怎样才能做到这一点?窗口函数和/或 udf/pandas_udf 的组合?
【问题讨论】:
一个样本数据帧与一个可重现的预期输出会更好:) 至少我们会更快地回答 @anky - 感谢您的评论 - 我修改了问题以包含一个详细的示例。 谢谢,但是图像很难复制。您可以从这里获得一些帮助:***.com/questions/48427185/… @anky - 请查看更新后的问题 @MrT。我的回答有帮助吗,还是您在寻找不同的东西? 【参考方案1】:我已经找到了一种没有任何 UDF 的可能方法。 (代码后说明)
sdf. \
withColumn('flag_odd', (func.col('epoch')%2 != 0).cast('int')). \
withColumn('flag_even', (func.col('epoch')%2 == 0).cast('int')). \
withColumn('sum_curr_and_prev', func.sum('volume').over(wd.orderBy('epoch').rowsBetween(-1, 0))). \
withColumn('even_collected', func.collect_list(func.when(func.col('flag_odd') == 1, func.col('sum_curr_and_prev'))).over(wd.orderBy('epoch').rowsBetween(-6, -1))). \
withColumn('odd_collected', func.collect_list(func.when(func.col('flag_even') == 1, func.col('sum_curr_and_prev'))).over(wd.orderBy('epoch').rowsBetween(-6, -1))). \
withColumn('even_collected', func.when((func.col('flag_even') == 1) & (func.col('epoch') > 6), func.col('even_collected'))). \
withColumn('odd_collected', func.when((func.col('flag_odd') == 1) & (func.col('epoch') > 6), func.col('odd_collected'))). \
withColumn('all_collections', func.coalesce('even_collected', 'odd_collected')). \
withColumn('median_val', func.sort_array('all_collections')[1]). \
show()
# +-----+------+--------+---------+-----------------+--------------+---------------+---------------+----------+
# |epoch|volume|flag_odd|flag_even|sum_curr_and_prev|even_collected| odd_collected|all_collections|median_val|
# +-----+------+--------+---------+-----------------+--------------+---------------+---------------+----------+
# | 1| 30| 1| 0| 30| null| null| null| null|
# | 2| 77| 0| 1| 107| null| null| null| null|
# | 3| 73| 1| 0| 150| null| null| null| null|
# | 4| 57| 0| 1| 130| null| null| null| null|
# | 5| 6| 1| 0| 63| null| null| null| null|
# | 6| 37| 0| 1| 43| null| null| null| null|
# | 7| 75| 1| 0| 112| null| [107, 130, 43]| [107, 130, 43]| 107|
# | 8| 44| 0| 1| 119|[150, 63, 112]| null| [150, 63, 112]| 112|
# | 9| 50| 1| 0| 94| null| [130, 43, 119]| [130, 43, 119]| 119|
# | 10| 65| 0| 1| 115| [63, 112, 94]| null| [63, 112, 94]| 94|
# | 11| 97| 1| 0| 162| null| [43, 119, 115]| [43, 119, 115]| 115|
# | 12| 84| 0| 1| 181|[112, 94, 162]| null| [112, 94, 162]| 112|
# | 13| 18| 1| 0| 102| null|[119, 115, 181]|[119, 115, 181]| 119|
# | 14| 19| 0| 1| 37|[94, 162, 102]| null| [94, 162, 102]| 102|
# | 15| 71| 1| 0| 90| null| [115, 181, 37]| [115, 181, 37]| 115|
# | 16| 46| 0| 1| 117|[162, 102, 90]| null| [162, 102, 90]| 102|
# | 17| 88| 1| 0| 134| null| [181, 37, 117]| [181, 37, 117]| 117|
# | 18| 12| 0| 1| 100|[102, 90, 134]| null| [102, 90, 134]| 102|
# | 19| 24| 1| 0| 36| null| [37, 117, 100]| [37, 117, 100]| 100|
# | 20| 35| 0| 1| 59| [90, 134, 36]| null| [90, 134, 36]| 90|
# +-----+------+--------+---------+-----------------+--------------+---------------+---------------+----------+
我已经使用了你提供的数据
data = [(1, 30), (2,77), (3,73), (4,57), (5,6), (6,37), (7,75), (8,44), (9,50), (10,65),
(11,97), (12,84), (13,18), (14,19), (15,71), (16,46), (17,88), (18,12), (19,24), (20,35)]
sdf = spark.sparkContext.parallelize(data).toDF(['epoch', 'volume'])
# +-----+------+
# |epoch|volume|
# +-----+------+
# | 1| 30|
# | 2| 77|
# | 3| 73|
# | 4| 57|
# | 5| 6|
# +-----+------+
从那里,我创建奇数和偶数记录标识符。他们将帮助我们创建一个集合。请注意,我假设 epoch
是连续且有序的。
然后,我将成对的音量值相加。 epoch 2 卷和 epoch 1 卷,epoch 3 卷和 epoch 2 卷,依此类推。
创建集合 - 我使用了带有 when()
条件的 sql 函数 collect_list()
。我想,对于每个奇数时期,您收集前 3 个偶数时期的对总和的中位数(步骤 2)。例如- 对于 epoch 7,你需要 epoch 6 的 pair sum、epoch 4 的 pair sum、epoch 2 的 pair sum,即[43, 130, 107]
。偶数时期的类似方法。
一旦你有了集合,中间值就是集合的中间值。该集合是一个包含 3 个值的列表,可以排序,并且可以从列表中提取第二个值。例如[43, 130, 107]
可以排序为[43, 107, 130]
,107
可以用[43, 107, 130][1]
提取
【讨论】:
以上是关于pyspark如何在窗口内聚合的主要内容,如果未能解决你的问题,请参考以下文章