如何将 groupBy 和聚合函数应用于 PySpark DataFrame 中的特定窗口?

Posted

技术标签:

【中文标题】如何将 groupBy 和聚合函数应用于 PySpark DataFrame 中的特定窗口?【英文标题】:How to apply groupBy and aggregate functions to a specific window in a PySpark DataFrame? 【发布时间】:2021-02-10 07:54:34 【问题描述】:

我想将groupBy 和随后的agg 函数应用于 PySpark DataFrame,但仅限于特定窗口。这最好用一个例子来说明。假设我有一个名为 df 的数据集:

df.show()

    +-----+----------+----------+-------+
    |   ID| Timestamp| Condition|  Value|
    +-----+----------+----------+-------+
    |   z1|         1|         0|     50|
|-------------------------------------------|
|   |   z1|         2|         0|     51|   |
|   |   z1|         3|         0|     52|   |
|   |   z1|         4|         0|     51|   |
|   |   z1|         5|         1|     51|   |
|   |   z1|         6|         0|     49|   |
|   |   z1|         7|         0|     44|   |
|   |   z1|         8|         0|     46|   |
|-------------------------------------------|
    |   z1|         9|         0|     48|
    |   z1|        10|         0|     42|
 +-----+----------+----------+-------+

特别是,我想做的是将一种 +- 3 行的窗口应用于列 Condition == 1 所在的行(即在本例中为第 5 行)。在那个窗口中,如上面的DataFrame所示,我想找到Value列的最小值和Timestamp列的对应值,从而得到:

+----------+----------+
| Min_value| Timestamp|
+----------+----------+
|        44|         7|
+----------+----------+

有谁知道如何解决这个问题?

在此先感谢

马里奥安萨斯

【问题讨论】:

【参考方案1】:

您可以使用跨越前 3 行和后 3 行的窗口,获取最小值并过滤条件:

from pyspark.sql import functions as F, Window

df2 = df.withColumn(
    'min',
    F.min(
        F.struct('Value', 'Timestamp')
    ).over(Window.partitionBy('ID').orderBy('Timestamp').rowsBetween(-3,3))
).filter('Condition = 1').select('min.*')

df2.show()
+-----+---------+
|Value|Timestamp|
+-----+---------+
|   44|        7|
+-----+---------+

【讨论】:

嗨@mck!非常感谢您的建议。但是,这肯定适用于只有一行包含Condition == 1 的数据集。但是,我认为这不适用于我有两行或多行 Condition == 1 的数据集。你同意吗? @Marioanzas 是的,我意识到这是一个糟糕的解决方案。我重写了它,希望它现在能做得更好

以上是关于如何将 groupBy 和聚合函数应用于 PySpark DataFrame 中的特定窗口?的主要内容,如果未能解决你的问题,请参考以下文章

Parquet 文件上 groupby 的最佳实践

JAVA spark数据集中的GroupBy和聚合函数

Pandas groupby 将特定函数聚合/应用到特定列(np.sum,sum)

熊猫如何将函数应用于 groupby().first()

如何将*多个*功能应用于熊猫 groupby 应用?

数据分析—Pandas 中的分组聚合Groupby 高阶操作