在 Python/PySpark 中 Spark 复制数据框列的最佳实践?
Posted
技术标签:
【中文标题】在 Python/PySpark 中 Spark 复制数据框列的最佳实践?【英文标题】:Spark copying dataframe columns best practice in Python/PySpark? 【发布时间】:2018-12-19 01:32:56 【问题描述】:这适用于使用 Spark 2.3.2 的 Python/PySpark。 我正在寻找最佳实践方法,用于使用 Python/PySpark 将一个数据帧的列复制到另一个数据帧,以获得超过 10 亿行的非常大的数据集(按年/月/日均匀划分)。每行有 120 列要转换/复制。输出数据帧将被写入,日期分区,到另一个 parquet 文件集。
示例架构是: 输入 DFinput (colA, colB, colC) 和 输出DF输出(X,Y,Z)
我想将 DFInput 复制到 DFOutput 如下 (colA => Z, colB => X, colC => Y)。
在 Python Spark 2.3+ 中执行此操作的最佳做法是什么? 我应该为每列使用 DF.withColumn() 方法将源复制到目标列吗? 考虑到数十亿行,每行有 110 多列要复制,这会表现良好吗?
谢谢
【问题讨论】:
withColumns 性能不佳 可能是一个接受答案的想法 【参考方案1】:在 PySpark 中处理列映射的另一种方法是通过 dictionary
。字典帮助您使用key/value
结构将初始数据帧的列映射到最终数据帧的列,如下所示:
from pyspark.sql.functions import col
df = spark.createDataFrame([
[1, "John", "2019-12-01 10:00:00"],
[2, "Michael", "2019-12-01 11:00:00"],
[2, "Michael", "2019-12-01 11:01:00"],
[3, "Tom", "2019-11-13 20:00:00"],
[3, "Tom", "2019-11-14 00:00:00"],
[4, "Sofy", "2019-10-01 01:00:00"]
], ["A", "B", "C"])
col_map = "A":"Z", "B":"X", "C":"Y"
df.select(*[col(k).alias(col_map[k]) for k in col_map]).show()
# +---+-------+-------------------+
# | Z| X| Y|
# +---+-------+-------------------+
# | 1| John|2019-12-01 10:00:00|
# | 2|Michael|2019-12-01 11:00:00|
# | 2|Michael|2019-12-01 11:01:00|
# | 3| Tom|2019-11-13 20:00:00|
# | 3| Tom|2019-11-14 00:00:00|
# | 4| Sofy|2019-10-01 01:00:00|
# +---+-------+-------------------+
这里我们将 A、B、C 分别映射到 Z、X、Y 中。
如果你想要一个模块化的解决方案,你也可以把所有东西都放在一个函数中:
def transform_cols(mappings, df):
return df.select(*[col(k).alias(mappings[k]) for k in mappings])
甚至通过使用monkey patching 来扩展DataFrame
类的现有功能,从而实现更加模块化。将下一个代码放在您的 PySpark 代码之上(您也可以创建一个迷你库并在需要时将其包含在您的代码中):
from pyspark.sql import DataFrame
def transform_cols(self, mappings):
return self.select(*[col(k).alias(mappings[k]) for k in mappings])
DataFrame.transform = transform_cols
然后调用它:
df.transform(col_map).show()
PS:这可能是一种方便的方式来扩展 DataFrame 功能,方法是创建您自己的库并通过 DataFrame 和猴子补丁(熟悉 C# 的人的扩展方法)公开它们。
【讨论】:
这是一个很好的解决方案,但我如何在原始数据框中进行更改。这里 df.select 正在返回新的 df。我希望将列添加到我原来的 df 本身中。【参考方案2】:我遇到的这个有趣的例子展示了两种方法和更好的方法,并同意另一个答案。这是 Scala,而不是 pyspark,但同样的原则适用,即使不同的例子。
import org.apache.spark.sql.functions._
import spark.implicits._
val df = Seq(
("1","2", "3"),
("4", "5", "6"),
("100","101", "102")
).toDF("c1", "c2", "c3")
这很昂贵,就是withColumn,每次迭代都会创建一个新的DF:
val df2 = df.columns.foldLeft(df) case (df, col) =>
df.withColumn(col, df(col).cast("int"))
//df2.show(false)
这样更快。
val df3 = df.select(df.columns.map col =>
df(col).cast("int")
: _*)
//df3.show(false)
【讨论】:
【参考方案3】:使用 dataframe.withColumn() 通过添加列或替换具有相同名称的现有列来返回新的 DataFrame。
【讨论】:
【参考方案4】:使用 Apache Spark 的方法 - 据我了解您的问题 - 是将您的输入 DataFrame 转换为所需的输出 DataFrame。您可以简单地在该任务的输入 DataFrame 上使用 selectExpr
:
outputDF = inputDF.selectExpr("colB as X", "colC as Y", "colA as Z")
此转换不会将输入 DataFrame 中的数据“复制”到输出 DataFrame。
【讨论】:
【参考方案5】:对此(python)有点菜鸟,但是在 SQL(或您拥有的任何来源)中执行此操作是否更容易,然后将其读入新的/单独的数据帧?
【讨论】:
以上是关于在 Python/PySpark 中 Spark 复制数据框列的最佳实践?的主要内容,如果未能解决你的问题,请参考以下文章
在 python (pyspark) 中使用 combinebykey spark rdd 计算组上的聚合
使 Spark 结构化流中的 JSON 可以在 python (pyspark) 中作为没有 RDD 的数据帧访问