加入后替换pyspark数据框中的列

Posted

技术标签:

【中文标题】加入后替换pyspark数据框中的列【英文标题】:Replace columns in pyspark dataframe after join 【发布时间】:2020-04-14 19:25:12 【问题描述】:

我有以下 2 个数据框。 col_1 col_2 col_3 属于数据帧 1,col_4 col_5 和 col_6 属于数据帧 2。 必须对来自 df1 的 col_1 和来自 df2 的 col_4 执行连接,并且它是一个“左”连接

数据框 1

col_1   col_2   col_3   col_3a
a       NA      NA      A
b       NA      NA      B
c       NA      NA      C
d       NA      NA      D

数据框 2

col_4   col_5   col_6
a       1       1
b       1       1
c       1       1

输出数据框应采用以下格式:

col_1   col_2   col_3    col_3a
a       1       1        A
b       1       1        B
c       1       1        C
d       NA      NA       D

本质上,当匹配可用时,应在数据帧 1 - col_2 和 col_3 中替换 col_5 和 col_6 值。 我尝试使用 withcolumnrenamed 并没有太大成功。

【问题讨论】:

【参考方案1】:

使用coalesce函数从col_5,col_2col_6,col_3

中获取first not null

Example:

df1.join(df2,df1.col_1 == df2.col_4,'left').\
selectExpr("col_1","coalesce(col_5,col_2) as col_2","coalesce(col_6,col_3) as col_3","col_3a").\
orderBy("col_1").\
show()
#+-----+-----+-----+------+
#|col_1|col_2|col_3|col_3a|
#+-----+-----+-----+------+
#|    a|    1|    1|     A|
#|    b|    1|    1|     B|
#|    c|    1|    1|     C|
#|    d|   NA|   NA|     D|
#+-----+-----+-----+------+

【讨论】:

col_2 和 col_3 有可能具有 NA 以外的值。这就是为什么我没有使用合并。无论 col_2 和 col_3 中的值是什么,我都必须在找到匹配项时将其替换为 col_5 和 col_6 中的值 @shankar,如果我们有 null 值存在 col_5(找不到匹配项),那么我们将用 col_2 替换,否则我们总是得到 col_5 找到匹配的值。 感谢您的回复。同意..但即使 col_5 有 NA,我仍然必须用 col_5 中的 NA 替换 col_2。无论哪种情况,它都会替换整个列 @shankar,试试这个df1.join(df2,df1.col_1 == df2.col_4,'left').withColumn("col_2", when(col("col_5").isNull(),col("col_2")).otherwise(col("col_5"))).withColumn("col_3",when(col("col_6").isNull(),col("col_3")).otherwise(col("col_6"))).select("col_1","col_2","col_3","col_3a").orderBy("col_1").show()让我知道它是否满足场景!【参考方案2】:

我想到的解决方案是将两个具有不同后缀的数据集合并,然后应用case_when。该解决方案未经测试。

首先,执行完全连接:(在您的示例中,左连接就足够了)

import pyspark.sql.functions as psf

df_join = (df1
            .join(df2, psf.col('col_1') == psf.col('col_4'), how = "full_outer")
            .drop("col_4")
           )

我以为你想要full join。如果需要,您可以更改。

然后,您使用psf.when 进行条件替换

df_join = df_join
    .withColumn("col_2",
            psf.when(psf.col('col_2').isNull(),
                     psf.col('col_5'))
                     )
                 )
    .withColumn("col_3",
            psf.when(psf.col('col_2').isNull(),
                     psf.col('col_6'))
                     )
                 )

【讨论】:

以上是关于加入后替换pyspark数据框中的列的主要内容,如果未能解决你的问题,请参考以下文章

应用 StringIndexer 更改 PySpark 数据框中的列

在 PySpark 数据框中的组中的列上应用函数

PySpark:将 RDD 转换为数据框中的列

如何拆分对象列表以分隔pyspark数据框中的列

如何以正确的格式以科学记数法显示 PySpark 数据框中的列

遍历 pyspark 数据框中的列,而不为单个列创建不同的数据框