如何根据条件在pyspark中跨连续行保留值

Posted

技术标签:

【中文标题】如何根据条件在pyspark中跨连续行保留值【英文标题】:How to preserve in pyspark a value across consecutive rows based on condition 【发布时间】:2020-12-19 19:11:03 【问题描述】:

我使用的是 Spark 3.0.1,这是我的 pyspark DataFrame 的一个示例:

| label| amount| bool   |
 ----------------------------- 
| a    | 10    | false  |
| a    | 2     | false  |
| b    | 20    | true   |
| c    | 3     | true   |
| d    | 2     | false  |
| f    | 5     | false  |
| w    | 50    | true   |
...
...

这是我用来生成上述示例的代码:

df = spark.createDataFrame(pd.DataFrame(
     'label': ["a", "a", "b", "c", "d", "f", "w"],
     'amount': [10, 2, 20, 3, 2, 5, 50],
     'bool': [False, False, True, True, False, False, True]
     ))

我想执行一项在我看来非常简单但我无法完成的任务。

我特别想:

    label 排序(已在示例中假设) 分配给新的true_label 列,值如下: label 中的值,如果boolfalse 最新的(在label 订购之后)label 已经在bool 中遇到了false

对前面示例的更新应该有助于更好地理解预期结果:

| label| amount| bool   | real_label |
 ----------------------------------- 
| a    | 10    | false  | a          |  <- because `bool` is false, `real_label` = `label`
| a    | 2     | false  | a          |  <- because `bool` is false, `real_label` = `label`
| b    | 20    | true   | a          |  <- because `a` the latest `label` with a `false` in `bool` 
| c    | 3     | true   | a          |  <- because `a` the latest `label` with a `false` in `bool` 
| d    | 2     | false  | d          |  <- because `bool` is false, `real_label` = `label`
| f    | 5     | false  | f          |  <- because `bool` is false, `real_label` = `label`
| w    | 50    | true   | f          |  <- because `f` the latest `label` with a `false` in `bool` 
...
...

是否有可能在不知道我可以遇到的连续false 的数量的情况下实现我想要的,并且还考虑到真正的数据框非常大并且性能很重要(所以不幸的是,基于 toPandas 的答案是禁止的,也会更好避免udf函数)?

【问题讨论】:

【参考方案1】:

使用 SQL 的解决方案

yourDF.createOrReplaceTempView("tmp_view")
    
yourTransformedDF = spark.sql("""SELECT 
        label, 
        amount, 
        bool, 
        label2,
        CASE WHEN bool THEN LAG(COALESCE(label2, label)) OVER (ORDER BY label) 
             ELSE label
        END AS real_label
    FROM (
      SELECT
        label,
        amount,
        bool,
        case when bool then LAG(label) OVER (ORDER BY label) else label end as label2
      FROM tmp_view) q""")

【讨论】:

【参考方案2】:

如果为真,使用窗口函数last获取之前的“假标签”,否则保留标签。

from pyspark.sql import functions as F, Window

df2 = df.withColumn(
    'real_label',
    F.when(
        F.col('bool'),    # get previous false label if true
        F.last(
            F.when(~F.col('bool'), F.col('label')),    # keep false labels and mask true labels with null
            ignorenulls=True
        ).over(Window.orderBy('label'))
    ).otherwise(F.col('label'))    # otherwise keep label if false
)

df2.show()
+-----+------+-----+----------+
|label|amount| bool|real_label|
+-----+------+-----+----------+
|    a|    10|false|         a|
|    a|     2|false|         a|
|    b|    20| true|         a|
|    c|     3| true|         a|
|    d|     2|false|         d|
|    f|     5|false|         f|
|    w|    50| true|         f|
+-----+------+-----+----------+

【讨论】:

非常清晰,先生。这确实是我想要实现的。简单而高效。最后一个功能没有引起我的注意,我也没有想过以如此优雅的方式使用它。谢谢。

以上是关于如何根据条件在pyspark中跨连续行保留值的主要内容,如果未能解决你的问题,请参考以下文章

如何根据条件选择R数据框中的连续行?

如何在 PySpark 中跨多个时间间隔使用 .filter() 操作?

如何根据Pyspark中数据框中的条件设置新的列表值?

如何根据pyspark中的行和列条件过滤多行

根据 pyspark 中的条件聚合值

PYSPARK:根据条件用另一个行值更新一行中的值?