pyspark如何在窗口内聚合

Posted

技术标签:

【中文标题】pyspark如何在窗口内聚合【英文标题】:pyspark how to aggregate inside of a window 【发布时间】:2021-08-11 16:22:58 【问题描述】:

我有一个具有以下架构的数据框

    epoch - long(事件的纪元,四舍五入到最接近的分钟) client_id - 字符串(自我解释) volume - long(该分钟内发生的事件数)

我想添加以下列(称为prev-1h-3m-interval-median) - 由client_id 分区,我想每分钟查看前60分钟,连续3分钟(0-3、3-6、.. . 57-60) 并得到总和的中位数。

EDIT - 为单个 client_id 添加示例

epoch volume
1 30
2 77
3 73
4 57
5 6
6 37
7 75
8 44
9 50
10 65
11 97
12 84
13 18
14 19
15 71
16 46
17 88
18 12
19 24
20 35

这个例子可以通过以下代码创建:

data = [(1, 30), (2,77), (3,73), (4,57), (5,6), (6,37), (7,75), (8,44), (9,50), (10,65),
        (11,97), (12,84), (13,18), (14,19), (15,71), (16,46), (17,88), (18,12), (19,24), (20,35)]
schema = ['epoch', 'volume']
df = spark.createDataFrame(data, schema=schema)

此示例包含的数据点较少,但逻辑相同。 在这里,我想创建一个名为 prev-6-2-interval-median 的列,其中我们想要前 6 行中每 2 个连续行的总和的中位数。时期 1-6 将有 0 个值,因为它们没有前 6 行.... 对于 epoch 7,计算将是 median((30+77), (73+57), (6+37)) = median(107, 130, 43) = 107 对于 epoch 8,计算将是 median((77+73), (57+6), (37+75)) = median(150, 63, 112) = 112 等等。 所以我试图实现的输出就像这样 (抱歉图片 - 内联表格格式效果不佳) 我怎样才能做到这一点?窗口函数和/或 udf/pandas_udf 的组合?

【问题讨论】:

一个样本数据帧与一个可重现的预期输出会更好:) 至少我们会更快地回答 @anky - 感谢您的评论 - 我修改了问题以包含一个详细的示例。 谢谢,但是图像很难复制。您可以从这里获得一些帮助:***.com/questions/48427185/… @anky - 请查看更新后的问题 @MrT。我的回答有帮助吗,还是您在寻找不同的东西? 【参考方案1】:

我已经找到了一种没有任何 UDF 的可能方法。 (代码后说明)

sdf. \
    withColumn('flag_odd', (func.col('epoch')%2 != 0).cast('int')). \
    withColumn('flag_even', (func.col('epoch')%2 == 0).cast('int')). \
    withColumn('sum_curr_and_prev', func.sum('volume').over(wd.orderBy('epoch').rowsBetween(-1, 0))). \
    withColumn('even_collected', func.collect_list(func.when(func.col('flag_odd') == 1, func.col('sum_curr_and_prev'))).over(wd.orderBy('epoch').rowsBetween(-6, -1))). \
    withColumn('odd_collected', func.collect_list(func.when(func.col('flag_even') == 1, func.col('sum_curr_and_prev'))).over(wd.orderBy('epoch').rowsBetween(-6, -1))). \
    withColumn('even_collected', func.when((func.col('flag_even') == 1) & (func.col('epoch') > 6), func.col('even_collected'))). \
    withColumn('odd_collected', func.when((func.col('flag_odd') == 1) & (func.col('epoch') > 6), func.col('odd_collected'))). \
    withColumn('all_collections', func.coalesce('even_collected', 'odd_collected')). \
    withColumn('median_val', func.sort_array('all_collections')[1]). \
    show()

# +-----+------+--------+---------+-----------------+--------------+---------------+---------------+----------+
# |epoch|volume|flag_odd|flag_even|sum_curr_and_prev|even_collected|  odd_collected|all_collections|median_val|
# +-----+------+--------+---------+-----------------+--------------+---------------+---------------+----------+
# |    1|    30|       1|        0|               30|          null|           null|           null|      null|
# |    2|    77|       0|        1|              107|          null|           null|           null|      null|
# |    3|    73|       1|        0|              150|          null|           null|           null|      null|
# |    4|    57|       0|        1|              130|          null|           null|           null|      null|
# |    5|     6|       1|        0|               63|          null|           null|           null|      null|
# |    6|    37|       0|        1|               43|          null|           null|           null|      null|
# |    7|    75|       1|        0|              112|          null| [107, 130, 43]| [107, 130, 43]|       107|
# |    8|    44|       0|        1|              119|[150, 63, 112]|           null| [150, 63, 112]|       112|
# |    9|    50|       1|        0|               94|          null| [130, 43, 119]| [130, 43, 119]|       119|
# |   10|    65|       0|        1|              115| [63, 112, 94]|           null|  [63, 112, 94]|        94|
# |   11|    97|       1|        0|              162|          null| [43, 119, 115]| [43, 119, 115]|       115|
# |   12|    84|       0|        1|              181|[112, 94, 162]|           null| [112, 94, 162]|       112|
# |   13|    18|       1|        0|              102|          null|[119, 115, 181]|[119, 115, 181]|       119|
# |   14|    19|       0|        1|               37|[94, 162, 102]|           null| [94, 162, 102]|       102|
# |   15|    71|       1|        0|               90|          null| [115, 181, 37]| [115, 181, 37]|       115|
# |   16|    46|       0|        1|              117|[162, 102, 90]|           null| [162, 102, 90]|       102|
# |   17|    88|       1|        0|              134|          null| [181, 37, 117]| [181, 37, 117]|       117|
# |   18|    12|       0|        1|              100|[102, 90, 134]|           null| [102, 90, 134]|       102|
# |   19|    24|       1|        0|               36|          null| [37, 117, 100]| [37, 117, 100]|       100|
# |   20|    35|       0|        1|               59| [90, 134, 36]|           null|  [90, 134, 36]|        90|
# +-----+------+--------+---------+-----------------+--------------+---------------+---------------+----------+

我已经使用了你提供的数据

data = [(1, 30), (2,77), (3,73), (4,57), (5,6), (6,37), (7,75), (8,44), (9,50), (10,65),
        (11,97), (12,84), (13,18), (14,19), (15,71), (16,46), (17,88), (18,12), (19,24), (20,35)]

sdf = spark.sparkContext.parallelize(data).toDF(['epoch', 'volume'])

# +-----+------+
# |epoch|volume|
# +-----+------+
# |    1|    30|
# |    2|    77|
# |    3|    73|
# |    4|    57|
# |    5|     6|
# +-----+------+
从那里,我创建奇数和偶数记录标识符。他们将帮助我们创建一个集合。请注意,我假设 epoch 是连续且有序的。 然后,我将成对的音量值相加。 epoch 2 卷和 epoch 1 卷,epoch 3 卷和 epoch 2 卷,依此类推。 创建集合 - 我使用了带有 when() 条件的 sql 函数 collect_list()。我想,对于每个奇数时期,您收集前 3 个偶数时期的对总和的中位数(步骤 2)。例如- 对于 epoch 7,你需要 epoch 6 的 pair sum、epoch 4 的 pair sum、epoch 2 的 pair sum,即[43, 130, 107]。偶数时期的类似方法。 一旦你有了集合,中间值就是集合的中间值。该集合是一个包含 3 个值的列表,可以排序,并且可以从列表中提取第二个值。例如[43, 130, 107] 可以排序为[43, 107, 130]107 可以用[43, 107, 130][1] 提取

【讨论】:

以上是关于pyspark如何在窗口内聚合的主要内容,如果未能解决你的问题,请参考以下文章

(Pyspark - 在一段时间内按用户分组

如何根据 PySpark 中窗口聚合的条件计算不同值?

如何将 groupBy 和聚合函数应用于 PySpark DataFrame 中的特定窗口?

pyspark 时间序列数据的高性能滚动/窗口聚合

在 PySpark 中随时间窗口聚合

Pyspark - 如何拆分具有 Datetime 类型的结构值的列?