为啥'withColumn'在pyspark中需要这么长时间?

Posted

技术标签:

【中文标题】为啥\'withColumn\'在pyspark中需要这么长时间?【英文标题】:Why is 'withColumn' taking so long in pyspark?为什么'withColumn'在pyspark中需要这么长时间? 【发布时间】:2020-06-22 17:45:50 【问题描述】:

我有一个 pyspark 数据框,其中包含 1000 列和 10,000 条记录(行)。 我需要通过对现有列执行一些计算来再创建 2000 列。

df #pyspark dataframe contaning 1000 columns and 10,000 records
df = df.withColumn('C1001', ((df['C269'] * df['C285'])/df['C41'])) #existing column names range from C1 to C1000
df = df.withColumn('C1002', ((df['C4'] * df['C267'])/df['C146']))
df = df.withColumn('C1003', ((df['C87'] * df['C134'])/df['C238']))
.
.
.
df = df.withColumn('C3000', ((df['C365'] * df['C235'])/df['C321']))

问题是,这需要的时间太长,大约 45 分钟左右。 由于我是新手,我想知道我做错了什么? P.S.:我在 databricks 上运行 spark,有 1 个驱动程序和 1 个工作节点,都具有 16GB 内存和 8 个内核。

谢谢!

【问题讨论】:

【参考方案1】:

您所做的很多事情只是创建一个执行计划。 Spark 会延迟执行,直到有一个动作触发它。因此,您看到的 45 分钟可能来自执行您设置的所有转换。

如果您想查看单个 withColumn 需要多长时间,请触发类似 df.count() 之类的操作,然后执行单个 withColumn,然后执行另一个 df.count()(再次触发操作)。

详细了解 pyspark 执行计划、转换和操作。

【讨论】:

对如何改进这些转换的时间有什么建议吗? 您可以在这里和那里尝试分散操作以降低执行计划的复杂性,有时这会有所帮助。如果您可以选择扩展集群(增加更多节点或 cpu 核心)并增加执行程序的数量,然后重新分区数据以便获得更多并行处理。【参考方案2】:

不要太具体

并查看第一个答案的观察结果 并且知道许多 DF 列(即“非常宽的数据”)的执行计划计算成本很高 转向 RDD 处理很可能是要走的路。

【讨论】:

【参考方案3】:

在一行中完成,而不是一个接一个地完成

df = df.withColumn('C1001', COl1).df.withColumn('C1002', COl2).df.withColumn('C1003', COl3) ......

【讨论】:

这样更好吗?会不会产生相同的执行计划

以上是关于为啥'withColumn'在pyspark中需要这么长时间?的主要内容,如果未能解决你的问题,请参考以下文章

使用 PySpark 中的列表中的 withColumn 函数动态创建新列

如何在pyspark withcolumn中使用udf和class

python 在PySpark中为withColumn编写UDF

python 在PySpark中为withColumn编写UDF

pyspark Column 不可使用 withColumn 进行迭代

PySpark:withColumn() 有两个条件和三个结果