PySpark 中的窗口函数和条件过滤器

Posted

技术标签:

【中文标题】PySpark 中的窗口函数和条件过滤器【英文标题】:Window function and conditional filters in PySpark 【发布时间】:2019-11-20 22:16:15 【问题描述】:

有没有办法有条件地将过滤器应用于 pyspark 中的窗口函数?对于col1 中的每个组,我只想保留col2 中具有X 的行。如果一个组在col2 中没有X,我想保留该组中的所有行。

+------+------+
| col1 | col2 |
+------+------+
| A    |      |
+------+------+
| A    | X    |
+------+------+
| A    |      |
+------+------+
| B    |      |
+------+------+
| B    |      |
+------+------+
| B    |      |
+------+------+

【问题讨论】:

【参考方案1】:

您可以使用max 窗口函数来表示该组(由 col1 分区)在 col2 中具有标识符(在本例中为 1)具有“X”。没有“X”的组将被分配null。此后只需过滤中间数据帧即可获得所需的结果。

from pyspark.sql import Window
from pyspark.sql.functions import max,when
w = Window.partitionBy(df.col1)
df_1 = df.withColumn('x_exists',max(when(df.col2 == 'X',1)).over(w))
df_2 = df_1.filter(((df_1.x_exists == 1) & (df_1.col2 == 'X')) | df_1.x_exists.isNull())
df_2.show()

【讨论】:

【参考方案2】:

使用collect_list 和更多SQL 语法的替代方法:collect_list 跳过NULL 值,我们使用if(col2='X',1,NULL) 作为列表项,这样当col2 中没有显示“X”时,此collect_list 的大小为零:

from pyspark.sql.functions import expr                                                                              

df_new = df.withColumn('has_X', expr("size(collect_list(if(col2='X',1,NULL)) OVER (partition by col1))>0")) \
           .filter("col2 = 'X' OR !has_X")

【讨论】:

以上是关于PySpark 中的窗口函数和条件过滤器的主要内容,如果未能解决你的问题,请参考以下文章

pyspark 使用过滤器应用 DataFrame 窗口函数

带有过滤器的pyspark窗口函数

Apache Spark 中窗口函数中的过滤器和条件

Pyspark 窗口函数,具有对旅行者数量进行取整的条件

PySpark 窗口函数标记满足特定条件的每个分区的第一行

使用窗口函数计算 PySpark 中的累积和