Pyspark - 如何将转换后的列与原始 DataFrame 合并?
Posted
技术标签:
【中文标题】Pyspark - 如何将转换后的列与原始 DataFrame 合并?【英文标题】:Pyspark - how to merge transformed columns with an original DataFrame? 【发布时间】:2020-07-07 12:57:38 【问题描述】:我创建了一个函数来测试 DataFrame 上的转换。这仅返回转换后的列。
def test_concat(df: sd.DataFrame, col_names: list) -> sd.DataFrame:
return df.select(*[F.concat(df[column].cast(StringType()), F.lit(" new!")).alias(column) for column in col_names])
如何将现有列替换为原始 DF 中的转换一次并返回整个 DF?
示例 DF:
test_df = self.spark.createDataFrame([(1, 'metric1', 10), (2, 'metric2', 20), (3, 'metric3', 30)], ['id', 'metric', 'score'])
cols = ["metric"]
new_df = perform_concat(test_df, cols)
new_df.show()
预期结果:
|metric | score |
+-------------+--------+
|metric1 new! | 10 |
|metric2 new! | 20 |
|metric3 new! | 30 |
看起来我可以从 DF 中删除原始列,然后以某种方式附加转换后的列。但不确定这是实现这一目标的正确方法。
【问题讨论】:
能否分享一些 Impute 和预期输出的示例数据 @dsk 我已经更新了我的问题 请检查一下 为更清晰添加了一些屏幕截图,请检查它们。 【参考方案1】:我可以看到你只在度量列中添加了一个关键字,同样可以使用下面的内置 spark 函数来实现
withColumn 有两个功能
-
如果该列不存在,它将创建一个新的 clumn
如果该列存在,它将对同一列执行操作
逻辑到 Concat
from pyspark.sql import functions as F
df = df.withColumn('metric', F.concat(F.col('metric'), F.lit(' '), F.lit('new!')))
df = df.select('metric', 'score')
df.show()
输出------
|metric | score |
+-------------+--------+
|metric1 new! | 10 |
|metric2 new! | 20 |
|metric3 new! | 30 |
【讨论】:
这个想法是 DF 中可能有很多列,我想将一个函数应用于多个列。然后将转换后的列合并到原始 DF【参考方案2】:如果您想为许多列执行此操作,您可以调用 foldLeft。
@dsk 有正确的方法。
在这种情况下,您可能希望避免连接,因为不需要将您描述的操作与原始数据帧分离(这是基于您提供的示例,如果您在实际情况下有不同的需求,那么可能需要不同的示例)。
columnsToTransform.foldLeft(df)(
(acc, next) => acc.withColumn(next, concat(col(next), lit("new !")))
)
编辑:刚刚意识到我提出的建议仅适用于 scala,并且您的 sn-p 在 python 中。
对于 python 类似的东西仍然可以工作,而不是 fold 你会做一个 for:
df = yourOriginalDf
for(next in columnsToTransform):
df = df.withColumn(next, concat(col(next), lit("new !")))
【讨论】:
【参考方案3】:使用更新的列值和 monotonically increasing id
创建一个新数据框
new_df = test_concat(test_df, cols).withColumn("index", F.monotonically_increasing_id())
从第一个数据框中删除列列表和monotonically increasing id
test_df_upt = test_df.drop(*cols).withColumn("index", F.monotonically_increasing_id())
加入上述 2 个数据帧并删除索引列
test_df_upt.join(new_df, "index").drop("index").show()
【讨论】:
以上是关于Pyspark - 如何将转换后的列与原始 DataFrame 合并?的主要内容,如果未能解决你的问题,请参考以下文章
在 PySpark 中将 Python Dict 转换为稀疏 RDD 或 DF
如何使用pyspark将具有多个可能值的Json数组列表转换为数据框中的列