使用带有窗口的多个条件按组更新列

Posted

技术标签:

【中文标题】使用带有窗口的多个条件按组更新列【英文标题】:Update column groupwise using multiple condition with window 【发布时间】:2021-02-24 13:20:53 【问题描述】:

我遇到了 pyspark 提供的那些窗口函数,它们似乎非常有用。不幸的是,试图解决问题我经常无法正常工作。现在我想知道我的问题是否可以用窗口函数来解决......

这是我的任务:

从如下所示的数据框模型开始:

values = [(0,"a",True,True),(1,"a",True,True),(2,"a",True,True),(3,"a",True,True),(4,"a",True,True),
         (0,"b",False,True),(1,"b",True,True),(2,"b",True,True),(3,"b",False,True),(4,"b",True,True),
         (0,"c",False,True),(1,"c",True,True),(2,"c",True,True),(3,"c",False,True),(4,"c",False,True)]
columns = ['index', 'name', 'Res','solution']
mockup= spark.createDataFrame(values, columns)
mockup.show()
+-----+----+-----+----------------+
|index|name|  Res|default_solution|
+-----+----+-----+----------------+
|    0|   a| true|            true|
|    1|   a| true|            true|
|    2|   a| true|            true|
|    3|   a| true|            true|
|    4|   a| true|            true|
|    0|   b|false|            true|
|    1|   b| true|            true|
|    2|   b| true|            true|
|    3|   b|false|            true|
|    4|   b| true|            true|
|    0|   c|false|            true|
|    1|   c| true|            true|
|    2|   c| true|            true|
|    3|   c|false|            true|
|    4|   c|false|            true|
+-----+----+-----+----------------+

我现在想使用多个条件更新解决方案列。

如果每个组(名称)有超过 2 个错误值,或者如果一个组中有两个错误值但没有一个在索引 = 0,则整个组的解决方案列应该为假,否则为真。

查看期望的结果:

+-----+----+-----+--------+
|index|name|  Res|solution|
+-----+----+-----+--------+
|    0|   a| true|    true|
|    1|   a| true|    true|
|    2|   a| true|    true|
|    3|   a| true|    true|
|    4|   a| true|    true|
|    0|   b|false|    true|
|    1|   b| true|    true|
|    2|   b| true|    true|
|    3|   b|false|    true|
|    4|   b| true|    true|
|    0|   c|false|   false|
|    1|   c| true|   false|
|    2|   c| true|   false|
|    3|   c|false|   false|
|    4|   c|false|   false|
+-----+----+-----+--------+

我设法通过以下解决方案解决了这个问题,但我希望有一种更优雅的方法来做到这一点 - 也许使用 Windows。对于窗口函数,我一直在为将窗口放置在何处以及如何在更复杂的“何时”条件下使用它而苦苦挣扎。

我的解决方案不是很好:0)

df = mockup.filter(mockup.trip_distance_greater_zero == False).groupby(mockup.name).count()
false_filter_1 = df.filter(F.col('count')>2) \
            .select('name').collect()
false_filter_2 = df.filter(F.col('count')==2) \
            .select('name').collect()
array_false_1 = [str(row['name']) for row in false_filter_1]
array_false_2 = [str(row['name']) for row in false_filter_2]


false_filter_3 = mockup.filter((mockup['index']==0) & (mockup['Res']== False))\
            .select('name').collect()
array_false_3 = [str(row['name']) for row in false_filter_3]

mockup = mockup.withColumn("over_2",
                            F.when((F.col('name').isin(array_false_1)), True).otherwise(False))\
               .withColumn("eq_2",
                            F.when((F.col('name').isin(array_false_2)), True).otherwise(False))\
               .withColumn("at0",
                            F.when((F.col('name').isin(array_false_3)), True).otherwise(False))\
               .withColumn("solution",
                            F.when(((F.col('eq_2')==True) & (F.col('at0')==True)) | (F.col('over_2')==False)&(F.col('eq_2')==False), True).otherwise(False))\
              .drop('over_2')\
              .drop('eq_2')\
              .drop('at0')\
mockup.show()

【问题讨论】:

【参考方案1】:

这是我对您的描述进行编码的尝试。输出与您的“预期”输出不同,因为我猜您处理了一些逻辑不正确? b 和 c 在您的数据框中具有相同的模式,但不知何故,其中一个为真,另一个为假。

from pyspark.sql import functions as F, Window

df2 = mockup.withColumn(
    'false_count',
    F.count(F.when(F.col('Res') == False, 1)).over(Window.partitionBy('name'))
).withColumn(
    'false_at_0',
    F.count(F.when((F.col('Res') == False) & (F.col('index') == 0), 1)).over(Window.partitionBy('name'))
).withColumn(
    'solution',
    ~((F.col('false_count') > 2) | ((F.col('false_count') == 2) & (F.col('false_at_0') != 1)))
)

df2.show()
+-----+----+-----+--------+-----------+----------+
|index|name|  Res|solution|false_count|false_at_0|
+-----+----+-----+--------+-----------+----------+
|    0|   c|false|    true|          2|         1|
|    1|   c| true|    true|          2|         1|
|    2|   c| true|    true|          2|         1|
|    3|   c|false|    true|          2|         1|
|    4|   c| true|    true|          2|         1|
|    0|   b|false|    true|          2|         1|
|    1|   b| true|    true|          2|         1|
|    2|   b| true|    true|          2|         1|
|    3|   b|false|    true|          2|         1|
|    4|   b| true|    true|          2|         1|
|    0|   a| true|    true|          0|         0|
|    1|   a| true|    true|          0|         0|
|    2|   a| true|    true|          0|         0|
|    3|   a| true|    true|          0|         0|
|    4|   a| true|    true|          0|         0|
+-----+----+-----+--------+-----------+----------+

另一个也许更有用的例子:

values = [(0,"a",True,True),(1,"a",True,True),(2,"a",True,True),(3,"a",True,True),(4,"a",True,True),
         (0,"b",False,True),(1,"b",True,True),(2,"b",True,True),(3,"b",False,True),(4,"b",True,True),
         (0,"c",True,True),(1,"c",False,True),(2,"c",True,True),(3,"c",False,True),(4,"c",True,True),
         (0,"d",True,True),(1,"d",False,True),(2,"d",False,True),(3,"d",False,True),(4,"d",True,True)]
columns = ['index', 'name', 'Res','solution']
mockup= spark.createDataFrame(values, columns)

在被第一个代码处理后,会给出

+-----+----+-----+--------+-----------+----------+
|index|name|  Res|solution|false_count|false_at_0|
+-----+----+-----+--------+-----------+----------+
|    0|   d| true|   false|          3|         0|
|    1|   d|false|   false|          3|         0|
|    2|   d|false|   false|          3|         0|
|    3|   d|false|   false|          3|         0|
|    4|   d| true|   false|          3|         0|
|    0|   c| true|   false|          2|         0|
|    1|   c|false|   false|          2|         0|
|    2|   c| true|   false|          2|         0|
|    3|   c|false|   false|          2|         0|
|    4|   c| true|   false|          2|         0|
|    0|   b|false|    true|          2|         1|
|    1|   b| true|    true|          2|         1|
|    2|   b| true|    true|          2|         1|
|    3|   b|false|    true|          2|         1|
|    4|   b| true|    true|          2|         1|
|    0|   a| true|    true|          0|         0|
|    1|   a| true|    true|          0|         0|
|    2|   a| true|    true|          0|         0|
|    3|   a| true|    true|          0|         0|
|    4|   a| true|    true|          0|         0|
+-----+----+-----+--------+-----------+----------+

【讨论】:

以上是关于使用带有窗口的多个条件按组更新列的主要内容,如果未能解决你的问题,请参考以下文章

按组的每个出现值构建计数列

MySQL 使用多个条件更新列的值

R:按组计算多个相关性(并将输出保存到 csv 文件)

在sql server中用一个条件更新多个列

我想学习数据库增删改查

根据条件更新多个列,哪种方法性能更好,或者是不是有其他更好的方法