如何将 groupBy 和聚合函数应用于 PySpark DataFrame 中的特定窗口?
Posted
技术标签:
【中文标题】如何将 groupBy 和聚合函数应用于 PySpark DataFrame 中的特定窗口?【英文标题】:How to apply groupBy and aggregate functions to a specific window in a PySpark DataFrame? 【发布时间】:2021-02-10 07:54:34 【问题描述】:我想将groupBy
和随后的agg
函数应用于 PySpark DataFrame,但仅限于特定窗口。这最好用一个例子来说明。假设我有一个名为 df
的数据集:
df.show()
+-----+----------+----------+-------+
| ID| Timestamp| Condition| Value|
+-----+----------+----------+-------+
| z1| 1| 0| 50|
|-------------------------------------------|
| | z1| 2| 0| 51| |
| | z1| 3| 0| 52| |
| | z1| 4| 0| 51| |
| | z1| 5| 1| 51| |
| | z1| 6| 0| 49| |
| | z1| 7| 0| 44| |
| | z1| 8| 0| 46| |
|-------------------------------------------|
| z1| 9| 0| 48|
| z1| 10| 0| 42|
+-----+----------+----------+-------+
特别是,我想做的是将一种 +- 3 行的窗口应用于列 Condition == 1
所在的行(即在本例中为第 5 行)。在那个窗口中,如上面的DataFrame所示,我想找到Value
列的最小值和Timestamp
列的对应值,从而得到:
+----------+----------+
| Min_value| Timestamp|
+----------+----------+
| 44| 7|
+----------+----------+
有谁知道如何解决这个问题?
在此先感谢
马里奥安萨斯
【问题讨论】:
【参考方案1】:您可以使用跨越前 3 行和后 3 行的窗口,获取最小值并过滤条件:
from pyspark.sql import functions as F, Window
df2 = df.withColumn(
'min',
F.min(
F.struct('Value', 'Timestamp')
).over(Window.partitionBy('ID').orderBy('Timestamp').rowsBetween(-3,3))
).filter('Condition = 1').select('min.*')
df2.show()
+-----+---------+
|Value|Timestamp|
+-----+---------+
| 44| 7|
+-----+---------+
【讨论】:
嗨@mck!非常感谢您的建议。但是,这肯定适用于只有一行包含Condition == 1
的数据集。但是,我认为这不适用于我有两行或多行 Condition == 1
的数据集。你同意吗?
@Marioanzas 是的,我意识到这是一个糟糕的解决方案。我重写了它,希望它现在能做得更好以上是关于如何将 groupBy 和聚合函数应用于 PySpark DataFrame 中的特定窗口?的主要内容,如果未能解决你的问题,请参考以下文章