Pyspark - 如何将转换后的列与原始 DataFrame 合并?

Posted

技术标签:

【中文标题】Pyspark - 如何将转换后的列与原始 DataFrame 合并?【英文标题】:Pyspark - how to merge transformed columns with an original DataFrame? 【发布时间】:2020-07-07 12:57:38 【问题描述】:

我创建了一个函数来测试 DataFrame 上的转换。这仅返回转换后的列。

def test_concat(df: sd.DataFrame, col_names: list) -> sd.DataFrame:
    return df.select(*[F.concat(df[column].cast(StringType()), F.lit(" new!")).alias(column) for column in col_names])

如何将现有列替换为原始 DF 中的转换一次并返回整个 DF?

示例 DF:

test_df = self.spark.createDataFrame([(1, 'metric1', 10), (2, 'metric2', 20), (3, 'metric3', 30)], ['id', 'metric', 'score'])

cols = ["metric"]
new_df = perform_concat(test_df, cols)
new_df.show()

预期结果:

|metric       | score  |
+-------------+--------+
|metric1 new! | 10     |
|metric2 new! | 20     |
|metric3 new! | 30     |

看起来我可以从 DF 中删除原始列,然后以某种方式附加转换后的列。但不确定这是实现这一目标的正确方法。

【问题讨论】:

能否分享一些 Impute 和预期输出的示例数据 @dsk 我已经更新了我的问题 请检查一下 为更清晰添加了一些屏幕截图,请检查它们。 【参考方案1】:

我可以看到你只在度量列中添加了一个关键字,同样可以使用下面的内置 spark 函数来实现

withColumn 有两个功能

    如果该列不存在,它将创建一个新的 clumn 如果该列存在,它将对同一列执行操作

逻辑到 Concat

from pyspark.sql import functions as F    
df = df.withColumn('metric', F.concat(F.col('metric'), F.lit(' '), F.lit('new!')))
df = df.select('metric', 'score')
df.show()

输出------

|metric       | score  |
+-------------+--------+
|metric1 new! | 10     |
|metric2 new! | 20     |
|metric3 new! | 30     |

【讨论】:

这个想法是 DF 中可能有很多列,我想将一个函数应用于多个列。然后将转换后的列合并到原始 DF【参考方案2】:

如果您想为许多列执行此操作,您可以调用 foldLeft。

@dsk 有正确的方法。

在这种情况下,您可能希望避免连接,因为不需要将您描述的操作与原始数据帧分离(这是基于您提供的示例,如果您在实际情况下有不同的需求,那么可能需要不同的示例)。

columnsToTransform.foldLeft(df)(
  (acc, next) => acc.withColumn(next, concat(col(next), lit("new !")))
)

编辑:刚刚意识到我提出的建议仅适用于 scala,并且您的 sn-p 在 python 中。

对于 python 类似的东西仍然可以工作,而不是 fold 你会做一个 for:

df = yourOriginalDf    
for(next in columnsToTransform):
       df = df.withColumn(next, concat(col(next), lit("new !")))

【讨论】:

【参考方案3】:

    使用更新的列值和 monotonically increasing id 创建一个新数据框

    new_df = test_concat(test_df, cols).withColumn("index", F.monotonically_increasing_id())

    从第一个数据框中删除列列表和monotonically increasing id

    test_df_upt = test_df.drop(*cols).withColumn("index", F.monotonically_increasing_id())

    加入上述 2 个数据帧并删除索引列

    test_df_upt.join(new_df, "index").drop("index").show()

【讨论】:

以上是关于Pyspark - 如何将转换后的列与原始 DataFrame 合并?的主要内容,如果未能解决你的问题,请参考以下文章

在 PySpark 中将 Python Dict 转换为稀疏 RDD 或 DF

Pyspark如何将一列与数据框中另一列的结果相乘?

如何使用pyspark将具有多个可能值的Json数组列表转换为数据框中的列

使用 pyspark 将 Spark 数据框中的列转换为数组 [重复]

如何更改pyspark中的列元数据?

将pyspark数据框的列转换为小写