通过比较 pyspark 数据框中的多列来更新一列

Posted

技术标签:

【中文标题】通过比较 pyspark 数据框中的多列来更新一列【英文标题】:updating a column by comparing multiple columns in pyspark data frame 【发布时间】:2018-05-18 23:21:37 【问题描述】:

我在pyspark 中有一个data frame,如下所示。

+-----+---+---+----+
|tests|val|asd|cnty|
+-----+---+---+----+
|test1|  Y|  1|null|
|test2|  N|  2|  UK|
| null|  Y|  1|  UK|
|test1|  N|  2|null|
|test1|  N|  3|null|
|test3|  N|  4| AUS|
|test4|  Y|  5|null|
+-----+---+---+----+

当任何给定的 testscnty 具有 val Y 时,我想更新该值,然后该特定 testscnty 的所有 val 应更新为 Y。如果不是,那么他们的价值观是什么。

我已经完成了如下操作

from pyspark.sql import Window
import pyspark.sql.functions as f


df1 = df.select('tests', f.max('val').over(Window.partitionBy('tests')).alias('val'), 'asd', 'cnty')

+-----+---+---+----+
|tests|val|asd|cnty|
+-----+---+---+----+
|test1|  Y|  1|null|
|test1|  Y|  2|null|
|test1|  Y|  3|null|
|test2|  N|  2|  UK|
|test3|  N|  4| AUS|
|test4|  Y|  5|null|
| null|  Y|  1|  UK|
+-----+---+---+----+

以上没有给我想要的结果。正如您所看到的test2 我有cnty 作为UKvalN 并且我有另一条记录,其中cntyUK 并且该记录的valY然后根据我的要求,这两个记录的val 应该是Y。但result中的情况并非如此。

【问题讨论】:

只运行相同的命令,但被cnty分区: df2 = df1.select('tests', f.max('val').over(Window.partitionBy('cnty') ).alias('val'), 'asd', 'cnty') @jxc 看起来可行,但我无法理解为什么它不能与“测试”一起使用但与“cnty”一起使用的行为 它在您使用 cnty 时有效,因为 uk 被分组为一组,并且该组的最大值为 Y。但是当您使用测试时,它们变成了两组,其中一个为 Y,另一个为 N。请参阅我的回答下面的详细解释和解决方案如何达到您想要的结果 【参考方案1】:

您只检查了测试列,但忘记检查 cnty 列。为此,您*需要另一个用于 cnty 列的 windowSpec 并使用 when 内置函数组合两个 windowSpecs 以获得您想要的结果

from pyspark.sql import window as w
windowSpec1 = w.Window.partitionBy('tests').orderBy('asd')
windowSpec2 = w.Window.partitionBy('cnty').orderBy('asd')

from pyspark.sql import functions as f
df = df.select(f.col('tests'), f.when(f.max('val').over(windowSpec1)== 'Y', 'Y').otherwise(f.when(f.max('val').over(windowSpec2)== 'Y', 'Y').otherwise(f.col('val'))).alias('val'), f.col('asd'), f.col('cnty'))
df.show(truncate=False)

这应该给你

+-----+---+---+----+
|tests|val|asd|cnty|
+-----+---+---+----+
|test4|Y  |5  |null|
|test3|N  |4  |AUS |
|test1|Y  |1  |null|
|test1|Y  |2  |null|
|test1|Y  |3  |null|
|test2|Y  |2  |UK  |
|null |Y  |1  |UK  |
+-----+---+---+----+

我希望这能解释为什么你没有得到想要的结果。

更新

上述解决方案需要两个 window 函数同时运行,这可能会导致您出现一些内存问题。您可以分别运行一个window 函数来分别检查testscnty

from pyspark.sql import window as w
windowSpec1 = w.Window.partitionBy('tests').orderBy('asd')
windowSpec2 = w.Window.partitionBy('cnty').orderBy('asd')

from pyspark.sql import functions as f
df = df.withColumn('val', f.when(f.max('val').over(windowSpec1)== 'Y', 'Y').otherwise(f.col('val')))\
    .withColumn('val', f.when(f.max('val').over(windowSpec2)== 'Y', 'Y').otherwise(f.col('val')))

这将产生相同的结果。

【讨论】:

Maharajan 您的解决方案似乎适用于小型数据,但在大型数据上却会导致像spill to disk 这样的问题。有没有办法可以避免使用window partitions 并达到我想要的结果 @user9367133,使用窗口功能是我所知道的解决您遇到的问题的最佳解决方案。现在spill to disk 可能是因为您同时使用了两个窗口函数。您可以修改解决方案以在每个检查条件中使用一个窗口函数。如果你不明白我在说什么,请告诉我,我会帮你的 Maharajan 实际上我对使用 windows 函数很陌生。请告诉我如何修改解决方案以在每个检查条件下使用一个窗口函数 @user9367133,我已经更新了我的答案。 :) 希望它能帮助您解决问题。 @RameshMaharjan 我们能否使用相同的windows 功能在单个语句中更新许多列。我的意思是说,您正在更新 val 列,如果我想在 sngle 语句中更新另一列,请说 registered 基于问题中的 testscnty 列。【参考方案2】:

您可以尝试以下方法。将数据框左连接到右侧使用“Y”值过滤的同一数据框:如果找到应用Y,否则选择现有值。

df.alias('a').join(
    df.filter(col('val')='Y').alias('b'),
    on=(col('a.tests') == col('b.tests')) | (col('a.cnty') == col('b.cnty')),
    how='left'
  )
  .withColumn('final_val',when(col('b.val').isNull(), col('a.val')).otherwise(col('b.val')))
  .select('a.tests','a.asd','a.cnty','final_val')

唯一的问题,可能存在重复,但最好用数据进行测试,并在需要时进行重复数据删除。

【讨论】:

在你的回答.withColumn('final_val',when(col('b.val').isNull(), 我相信这不应该存在,因为你正在过滤df df.filter(col('val')='Y').alias('b') 所以b 中的val 不会有null 值全部。我认为这将是.withColumn('final_val',when(col("b.val == 'Y'"), col('b.val')).otherwise(col('a.val'))) 正确的陈述 left join 为整个 b.* 部分生成空值。解决方法是正确的。 @Rumoku 我们可以使用相同的join 在单个语句中更新许多列吗?我的意思是说,您正在更新val 列,如果我想在单个语句中更新另一列,请说registered 基于问题中的testscnty @Question_bank 当然,您可以在不同条件下一个接一个地链接任意多个withColumn 语句:...withColumn('final_val'..).withColumn('final_reg',..).with...

以上是关于通过比较 pyspark 数据框中的多列来更新一列的主要内容,如果未能解决你的问题,请参考以下文章

如何通过 Pyspark 中同一数据框中另一列的正则表达式值过滤数据框中的一列

如何使用 groupby 和聚合将 pyspark 数据框中的行与多列连接起来

如何在 pyspark 中对 spark 数据框中的多列求和?

如何在 pyspark 中对 spark 数据框中的多列求和?

PySpark 将“map”类型的列转换为数据框中的多列

如何将每一列映射到pyspark数据框中的其他列?