在 Python/PySpark 中 Spark 复制数据框列的最佳实践?

Posted

技术标签:

【中文标题】在 Python/PySpark 中 Spark 复制数据框列的最佳实践?【英文标题】:Spark copying dataframe columns best practice in Python/PySpark? 【发布时间】:2018-12-19 01:32:56 【问题描述】:

这适用于使用 Spark 2.3.2 的 Python/PySpark。 我正在寻找最佳实践方法,用于使用 Python/PySpark 将一个数据帧的列复制到另一个数据帧,以获得超过 10 亿行的非常大的数据集(按年/月/日均匀划分)。每行有 120 列要转换/复制。输出数据帧将被写入,日期分区,到另一个 parquet 文件集。

示例架构是: 输入 DFinput (colA, colB, colC) 和 输出DF输出(X,Y,Z)

我想将 DFInput 复制到 DFOutput 如下 (colA => Z, colB => X, colC => Y)。

在 Python Spark 2.3+ 中执行此操作的最佳做​​法是什么? 我应该为每列使用 DF.withColumn() 方法将源复制到目标列吗? 考虑到数十亿行,每行有 110 多列要复制,这会表现良好吗?

谢谢

【问题讨论】:

withColumns 性能不佳 可能是一个接受答案的想法 【参考方案1】:

在 PySpark 中处理列映射的另一种方法是通过 dictionary。字典帮助您使用key/value 结构将初始数据帧的列映射到最终数据帧的列,如下所示:

from pyspark.sql.functions import col

df = spark.createDataFrame([
  [1, "John", "2019-12-01 10:00:00"],
  [2, "Michael", "2019-12-01 11:00:00"],
  [2, "Michael", "2019-12-01 11:01:00"],
  [3, "Tom", "2019-11-13 20:00:00"],
  [3, "Tom", "2019-11-14 00:00:00"],
  [4, "Sofy", "2019-10-01 01:00:00"]
], ["A", "B", "C"])


col_map = "A":"Z", "B":"X", "C":"Y"

df.select(*[col(k).alias(col_map[k]) for k in col_map]).show()

# +---+-------+-------------------+
# |  Z|      X|                  Y|
# +---+-------+-------------------+
# |  1|   John|2019-12-01 10:00:00|
# |  2|Michael|2019-12-01 11:00:00|
# |  2|Michael|2019-12-01 11:01:00|
# |  3|    Tom|2019-11-13 20:00:00|
# |  3|    Tom|2019-11-14 00:00:00|
# |  4|   Sofy|2019-10-01 01:00:00|
# +---+-------+-------------------+

这里我们将 A、B、C 分别映射到 Z、X、Y 中。

如果你想要一个模块化的解决方案,你也可以把所有东西都放在一个函数中:

def transform_cols(mappings, df):
  return df.select(*[col(k).alias(mappings[k]) for k in mappings])

甚至通过使用monkey patching 来扩展DataFrame 类的现有功能,从而实现更加模块化。将下一个代码放在您的 PySpark 代码之上(您也可以创建一个迷你库并在需要时将其包含在您的代码中):

from pyspark.sql import DataFrame

def transform_cols(self, mappings):
  return self.select(*[col(k).alias(mappings[k]) for k in mappings])

DataFrame.transform = transform_cols

然后调用它:

df.transform(col_map).show()

PS:这可能是一种方便的方式来扩展 DataFrame 功能,方法是创建您自己的库并通过 DataFrame 和猴子补丁(熟悉 C# 的人的扩展方法)公开它们。

【讨论】:

这是一个很好的解决方案,但我如何在原始数据框中进行更改。这里 df.select 正在返回新的 df。我希望将列添加到我原来的 df 本身中。【参考方案2】:

我遇到的这个有趣的例子展示了两种方法和更好的方法,并同意另一个答案。这是 Scala,而不是 pyspark,但同样的原则适用,即使不同的例子。

import org.apache.spark.sql.functions._
import spark.implicits._

val df = Seq(
             ("1","2", "3"),
             ("4", "5", "6"),
             ("100","101", "102")
            ).toDF("c1", "c2", "c3")

这很昂贵,就是withColumn,每次迭代都会创建一个新的DF:

val df2 = df.columns.foldLeft(df)  case (df, col) =>
          df.withColumn(col, df(col).cast("int"))
          
//df2.show(false)

这样更快。

val df3 = df.select(df.columns.map  col =>
          df(col).cast("int")
          : _*)
//df3.show(false)

【讨论】:

【参考方案3】:

使用 dataframe.withColumn() 通过添加列或替换具有相同名称的现有列来返回新的 DataFrame。

【讨论】:

【参考方案4】:

使用 Apache Spark 的方法 - 据我了解您的问题 - 是将您的输入 DataFrame 转换为所需的输出 DataFrame。您可以简单地在该任务的输入 DataFrame 上使用 selectExpr

outputDF = inputDF.selectExpr("colB as X", "colC as Y", "colA as Z")

此转换不会将输入 DataFrame 中的数据“复制”到输出 DataFrame。

【讨论】:

【参考方案5】:

对此(python)有点菜鸟,但是在 SQL(或您拥有的任何来源)中执行此操作是否更容易,然后将其读入新的/单独的数据帧?

【讨论】:

以上是关于在 Python/PySpark 中 Spark 复制数据框列的最佳实践?的主要内容,如果未能解决你的问题,请参考以下文章

在 python (pyspark) 中使用 combinebykey spark rdd 计算组上的聚合

使 Spark 结构化流中的 JSON 可以在 python (pyspark) 中作为没有 RDD 的数据帧访问

可从 PySpark/Python 调用的 Spark(2.3+)Java 函数 [重复]

从 Scala Spark 代码调用 Pyspark 脚本

python pyspark入门篇

python pyspark-sql-create-spark-context.py