如何根据条件在pyspark中跨连续行保留值
Posted
技术标签:
【中文标题】如何根据条件在pyspark中跨连续行保留值【英文标题】:How to preserve in pyspark a value across consecutive rows based on condition 【发布时间】:2020-12-19 19:11:03 【问题描述】:我使用的是 Spark 3.0.1,这是我的 pyspark DataFrame 的一个示例:
| label| amount| bool |
-----------------------------
| a | 10 | false |
| a | 2 | false |
| b | 20 | true |
| c | 3 | true |
| d | 2 | false |
| f | 5 | false |
| w | 50 | true |
...
...
这是我用来生成上述示例的代码:
df = spark.createDataFrame(pd.DataFrame(
'label': ["a", "a", "b", "c", "d", "f", "w"],
'amount': [10, 2, 20, 3, 2, 5, 50],
'bool': [False, False, True, True, False, False, True]
))
我想执行一项在我看来非常简单但我无法完成的任务。
我特别想:
-
按
label
排序(已在示例中假设)
分配给新的true_label
列,值如下:
label
中的值,如果bool
是false
最新的(在label
订购之后)label
已经在bool
中遇到了false
对前面示例的更新应该有助于更好地理解预期结果:
| label| amount| bool | real_label |
-----------------------------------
| a | 10 | false | a | <- because `bool` is false, `real_label` = `label`
| a | 2 | false | a | <- because `bool` is false, `real_label` = `label`
| b | 20 | true | a | <- because `a` the latest `label` with a `false` in `bool`
| c | 3 | true | a | <- because `a` the latest `label` with a `false` in `bool`
| d | 2 | false | d | <- because `bool` is false, `real_label` = `label`
| f | 5 | false | f | <- because `bool` is false, `real_label` = `label`
| w | 50 | true | f | <- because `f` the latest `label` with a `false` in `bool`
...
...
是否有可能在不知道我可以遇到的连续false
的数量的情况下实现我想要的,并且还考虑到真正的数据框非常大并且性能很重要(所以不幸的是,基于 toPandas 的答案是禁止的,也会更好避免udf
函数)?
【问题讨论】:
【参考方案1】:使用 SQL 的解决方案
yourDF.createOrReplaceTempView("tmp_view")
yourTransformedDF = spark.sql("""SELECT
label,
amount,
bool,
label2,
CASE WHEN bool THEN LAG(COALESCE(label2, label)) OVER (ORDER BY label)
ELSE label
END AS real_label
FROM (
SELECT
label,
amount,
bool,
case when bool then LAG(label) OVER (ORDER BY label) else label end as label2
FROM tmp_view) q""")
【讨论】:
【参考方案2】:如果为真,使用窗口函数last
获取之前的“假标签”,否则保留标签。
from pyspark.sql import functions as F, Window
df2 = df.withColumn(
'real_label',
F.when(
F.col('bool'), # get previous false label if true
F.last(
F.when(~F.col('bool'), F.col('label')), # keep false labels and mask true labels with null
ignorenulls=True
).over(Window.orderBy('label'))
).otherwise(F.col('label')) # otherwise keep label if false
)
df2.show()
+-----+------+-----+----------+
|label|amount| bool|real_label|
+-----+------+-----+----------+
| a| 10|false| a|
| a| 2|false| a|
| b| 20| true| a|
| c| 3| true| a|
| d| 2|false| d|
| f| 5|false| f|
| w| 50| true| f|
+-----+------+-----+----------+
【讨论】:
非常清晰,先生。这确实是我想要实现的。简单而高效。最后一个功能没有引起我的注意,我也没有想过以如此优雅的方式使用它。谢谢。以上是关于如何根据条件在pyspark中跨连续行保留值的主要内容,如果未能解决你的问题,请参考以下文章