使用带有窗口的多个条件按组更新列
Posted
技术标签:
【中文标题】使用带有窗口的多个条件按组更新列【英文标题】:Update column groupwise using multiple condition with window 【发布时间】:2021-02-24 13:20:53 【问题描述】:我遇到了 pyspark 提供的那些窗口函数,它们似乎非常有用。不幸的是,试图解决问题我经常无法正常工作。现在我想知道我的问题是否可以用窗口函数来解决......
这是我的任务:
从如下所示的数据框模型开始:
values = [(0,"a",True,True),(1,"a",True,True),(2,"a",True,True),(3,"a",True,True),(4,"a",True,True),
(0,"b",False,True),(1,"b",True,True),(2,"b",True,True),(3,"b",False,True),(4,"b",True,True),
(0,"c",False,True),(1,"c",True,True),(2,"c",True,True),(3,"c",False,True),(4,"c",False,True)]
columns = ['index', 'name', 'Res','solution']
mockup= spark.createDataFrame(values, columns)
mockup.show()
+-----+----+-----+----------------+
|index|name| Res|default_solution|
+-----+----+-----+----------------+
| 0| a| true| true|
| 1| a| true| true|
| 2| a| true| true|
| 3| a| true| true|
| 4| a| true| true|
| 0| b|false| true|
| 1| b| true| true|
| 2| b| true| true|
| 3| b|false| true|
| 4| b| true| true|
| 0| c|false| true|
| 1| c| true| true|
| 2| c| true| true|
| 3| c|false| true|
| 4| c|false| true|
+-----+----+-----+----------------+
我现在想使用多个条件更新解决方案列。
如果每个组(名称)有超过 2 个错误值,或者如果一个组中有两个错误值但没有一个在索引 = 0,则整个组的解决方案列应该为假,否则为真。
查看期望的结果:
+-----+----+-----+--------+
|index|name| Res|solution|
+-----+----+-----+--------+
| 0| a| true| true|
| 1| a| true| true|
| 2| a| true| true|
| 3| a| true| true|
| 4| a| true| true|
| 0| b|false| true|
| 1| b| true| true|
| 2| b| true| true|
| 3| b|false| true|
| 4| b| true| true|
| 0| c|false| false|
| 1| c| true| false|
| 2| c| true| false|
| 3| c|false| false|
| 4| c|false| false|
+-----+----+-----+--------+
我设法通过以下解决方案解决了这个问题,但我希望有一种更优雅的方法来做到这一点 - 也许使用 Windows。对于窗口函数,我一直在为将窗口放置在何处以及如何在更复杂的“何时”条件下使用它而苦苦挣扎。
我的解决方案不是很好:0)
df = mockup.filter(mockup.trip_distance_greater_zero == False).groupby(mockup.name).count()
false_filter_1 = df.filter(F.col('count')>2) \
.select('name').collect()
false_filter_2 = df.filter(F.col('count')==2) \
.select('name').collect()
array_false_1 = [str(row['name']) for row in false_filter_1]
array_false_2 = [str(row['name']) for row in false_filter_2]
false_filter_3 = mockup.filter((mockup['index']==0) & (mockup['Res']== False))\
.select('name').collect()
array_false_3 = [str(row['name']) for row in false_filter_3]
mockup = mockup.withColumn("over_2",
F.when((F.col('name').isin(array_false_1)), True).otherwise(False))\
.withColumn("eq_2",
F.when((F.col('name').isin(array_false_2)), True).otherwise(False))\
.withColumn("at0",
F.when((F.col('name').isin(array_false_3)), True).otherwise(False))\
.withColumn("solution",
F.when(((F.col('eq_2')==True) & (F.col('at0')==True)) | (F.col('over_2')==False)&(F.col('eq_2')==False), True).otherwise(False))\
.drop('over_2')\
.drop('eq_2')\
.drop('at0')\
mockup.show()
【问题讨论】:
【参考方案1】:这是我对您的描述进行编码的尝试。输出与您的“预期”输出不同,因为我猜您处理了一些逻辑不正确? b 和 c 在您的数据框中具有相同的模式,但不知何故,其中一个为真,另一个为假。
from pyspark.sql import functions as F, Window
df2 = mockup.withColumn(
'false_count',
F.count(F.when(F.col('Res') == False, 1)).over(Window.partitionBy('name'))
).withColumn(
'false_at_0',
F.count(F.when((F.col('Res') == False) & (F.col('index') == 0), 1)).over(Window.partitionBy('name'))
).withColumn(
'solution',
~((F.col('false_count') > 2) | ((F.col('false_count') == 2) & (F.col('false_at_0') != 1)))
)
df2.show()
+-----+----+-----+--------+-----------+----------+
|index|name| Res|solution|false_count|false_at_0|
+-----+----+-----+--------+-----------+----------+
| 0| c|false| true| 2| 1|
| 1| c| true| true| 2| 1|
| 2| c| true| true| 2| 1|
| 3| c|false| true| 2| 1|
| 4| c| true| true| 2| 1|
| 0| b|false| true| 2| 1|
| 1| b| true| true| 2| 1|
| 2| b| true| true| 2| 1|
| 3| b|false| true| 2| 1|
| 4| b| true| true| 2| 1|
| 0| a| true| true| 0| 0|
| 1| a| true| true| 0| 0|
| 2| a| true| true| 0| 0|
| 3| a| true| true| 0| 0|
| 4| a| true| true| 0| 0|
+-----+----+-----+--------+-----------+----------+
另一个也许更有用的例子:
values = [(0,"a",True,True),(1,"a",True,True),(2,"a",True,True),(3,"a",True,True),(4,"a",True,True),
(0,"b",False,True),(1,"b",True,True),(2,"b",True,True),(3,"b",False,True),(4,"b",True,True),
(0,"c",True,True),(1,"c",False,True),(2,"c",True,True),(3,"c",False,True),(4,"c",True,True),
(0,"d",True,True),(1,"d",False,True),(2,"d",False,True),(3,"d",False,True),(4,"d",True,True)]
columns = ['index', 'name', 'Res','solution']
mockup= spark.createDataFrame(values, columns)
在被第一个代码处理后,会给出
+-----+----+-----+--------+-----------+----------+
|index|name| Res|solution|false_count|false_at_0|
+-----+----+-----+--------+-----------+----------+
| 0| d| true| false| 3| 0|
| 1| d|false| false| 3| 0|
| 2| d|false| false| 3| 0|
| 3| d|false| false| 3| 0|
| 4| d| true| false| 3| 0|
| 0| c| true| false| 2| 0|
| 1| c|false| false| 2| 0|
| 2| c| true| false| 2| 0|
| 3| c|false| false| 2| 0|
| 4| c| true| false| 2| 0|
| 0| b|false| true| 2| 1|
| 1| b| true| true| 2| 1|
| 2| b| true| true| 2| 1|
| 3| b|false| true| 2| 1|
| 4| b| true| true| 2| 1|
| 0| a| true| true| 0| 0|
| 1| a| true| true| 0| 0|
| 2| a| true| true| 0| 0|
| 3| a| true| true| 0| 0|
| 4| a| true| true| 0| 0|
+-----+----+-----+--------+-----------+----------+
【讨论】:
以上是关于使用带有窗口的多个条件按组更新列的主要内容,如果未能解决你的问题,请参考以下文章