PySpark 中的窗口函数和条件过滤器
Posted
技术标签:
【中文标题】PySpark 中的窗口函数和条件过滤器【英文标题】:Window function and conditional filters in PySpark 【发布时间】:2019-11-20 22:16:15 【问题描述】:有没有办法有条件地将过滤器应用于 pyspark 中的窗口函数?对于col1
中的每个组,我只想保留col2
中具有X
的行。如果一个组在col2
中没有X
,我想保留该组中的所有行。
+------+------+
| col1 | col2 |
+------+------+
| A | |
+------+------+
| A | X |
+------+------+
| A | |
+------+------+
| B | |
+------+------+
| B | |
+------+------+
| B | |
+------+------+
【问题讨论】:
【参考方案1】:您可以使用max
窗口函数来表示该组(由 col1 分区)在 col2 中具有标识符(在本例中为 1)具有“X”。没有“X”的组将被分配null
。此后只需过滤中间数据帧即可获得所需的结果。
from pyspark.sql import Window
from pyspark.sql.functions import max,when
w = Window.partitionBy(df.col1)
df_1 = df.withColumn('x_exists',max(when(df.col2 == 'X',1)).over(w))
df_2 = df_1.filter(((df_1.x_exists == 1) & (df_1.col2 == 'X')) | df_1.x_exists.isNull())
df_2.show()
【讨论】:
【参考方案2】:使用collect_list
和更多SQL 语法的替代方法:collect_list 跳过NULL
值,我们使用if(col2='X',1,NULL)
作为列表项,这样当col2 中没有显示“X”时,此collect_list 的大小为零:
from pyspark.sql.functions import expr
df_new = df.withColumn('has_X', expr("size(collect_list(if(col2='X',1,NULL)) OVER (partition by col1))>0")) \
.filter("col2 = 'X' OR !has_X")
【讨论】:
以上是关于PySpark 中的窗口函数和条件过滤器的主要内容,如果未能解决你的问题,请参考以下文章