DataFrame.write.parquet() 只使用一个执行器,不扩展

Posted

技术标签:

【中文标题】DataFrame.write.parquet() 只使用一个执行器,不扩展【英文标题】:DataFrame.write.parquet() uses only one executor, does not scale 【发布时间】:2020-12-05 01:02:18 【问题描述】:

我读过:

Spark task runs on only one executor(以及它指向的那些) Spark Dataset cache is using only one executor spark streaming only executors on one machine is working

我正在尝试通过crossJoining 种子数据帧和其他一些数据帧来生成一个包含 100 亿行的测试数据集。我的问题是流程的最后一步final_df.write.parquet() 只使用一个工人/执行者,无论有多少。

这显然无法扩展到产生数十亿美元。问题出在哪里。

例如在每个有 4 个核心的 3 节点集群中,final_df 有 64 个分区,但只有一个执行程序使用该数据帧中的所有记录写入一个 parquet 文件。我还尝试了 12 个节点,它产生了具有 1936 个分区的数据帧。但同样的问题。

一些观察:

我认为 64 个分区来自 2 个crossJoin()。一个节点是主节点,留下 2 个执行器,每个执行器有 4 个处理器,因此它输出到:2*2*4*4 = 64。 如果我取消注释 coalesce() 的行,它会将分区数减少到 8 个。但只有一个执行程序会使用该数据帧中的所有记录写入一个 parquet 文件。 如果我repartition()(没有coalesce()),那么我得到8个文件,所有的执行器都被使用并且写作被完美地分配。但是现在问题转移到了重新分区步骤,该步骤仅由一个执行者完成。最后还是同样的问题。
import os, math
import pyspark.sql.functions as F
from datetime import datetime as dt

def log(*args):
  print(dt.now().isoformat() + ' ' + ' '.join([str(s) for s in args]))

log('spark.version', str(spark.version))

log("reading seed file")
spark.conf.set("fs.azure.account.key.myaccount.dfs.core.windows.net", "my key")
seed_df = spark.read.csv("abfss://fs1@myaccount.dfs.core.windows.net/seed.csv", header=True)

# NUM_RECORDS_TO_GENERATE = 10_000_000_000
NUM_RECORDS_TO_GENERATE = 2_000_000
NUM_RECORDS_TO_GENERATE = NUM_RECORDS_TO_GENERATE + (NUM_RECORDS_TO_GENERATE % seed_df.count())
array_len = int(math.sqrt(NUM_RECORDS_TO_GENERATE / seed_df.count()))

log("array_len: %s, NUM_RECORDS_TO_GENERATE: %s, seed_df.count(): %s" % (array_len, NUM_RECORDS_TO_GENERATE, seed_df.count()))

df1 = spark.createDataFrame(data=[[ [1] * array_len ]])
df2 = df1.withColumn('exploded', F.explode(df1['_1'])).drop('_1')
df3 = df2.crossJoin(df2) # contains array_len ^ 2 = NUM_RECORDS_TO_GENERATE / seed_df.count() records
newdf = df3.crossJoin(seed_df) # contains NUM_RECORDS_TO_GENERATE
final_df = newdf.withColumn('uniq_row_id', F.monotonically_increasing_id()).drop('exploded') # add unique id column

# log("repartitioning")
# final_df = final_df.repartition(int(final_df.rdd.getNumPartitions() / 2))
# log("coalesceing")
# final_df = final_df.coalesce(int(final_df.rdd.getNumPartitions() / 2))

log("final_df.rdd.getNumPartitions(): ", final_df.rdd.getNumPartitions())
log('writing parquet')
final_df.write.parquet("abfss://fs1@myaccount.dfs.core.windows.net/%s/parquet-%s" % (dt.now().isoformat(), NUM_RECORDS_TO_GENERATE))
log('wrote parquet.')
log('final_df.rdd.count():', final_df.rdd.count())

输出

2020-12-05T00:27:51.933995 spark.version 3.0.1
2020-12-05T00:27:51.934079 reading seed file
2020-12-05T00:27:52.713461 array_len: 377, NUM_RECORDS_TO_GENERATE: 2000002, seed_df.count(): 14
2020-12-05T00:27:52.852547 final_df.rdd.getNumPartitions():  64
2020-12-05T00:27:52.852749 writing parquet
2020-12-05T00:28:00.823663 wrote parquet.
2020-12-05T00:28:08.757957 final_df.rdd.count(): 1989806

合并

... same as above ...
2020-12-05T00:12:22.620791 coalesceing
2020-12-05T00:12:22.860093 final_df.rdd.getNumPartitions():  32
2020-12-05T00:12:22.860249 writing parquet
2020-12-05T00:12:31.280416 wrote parquet.
2020-12-05T00:12:39.204093 final_df.rdd.count(): 1989806

重新分区

... same as above ...
2020-12-05T00:23:40.155481 repartitioning
2020-12-05T00:23:44.702251 final_df.rdd.getNumPartitions():  8
2020-12-05T00:23:44.702421 writing parquet
2020-12-05T00:23:50.478841 wrote parquet.
2020-12-05T00:23:52.174997 final_df.rdd.count(): 1989806


耗时较长阶段的 DAG 可视化:

PS:忽略NUM_RECORDS_TO_GENERATE值与实际生成记录数的轻微不匹配。这可能是sqrt 中的一个数学问题,我不在乎它是否相差几百万。

【问题讨论】:

【参考方案1】:

所以我解决了,但我还是不知道为什么旧代码只使用一个执行器。

在与其他人交叉连接之前,我添加了一个新步骤来重新分区原始数据帧。之后,生成的数据帧使用所有执行器。

import os, math
import pyspark.sql.functions as F
from datetime import datetime as dt

def log(*args):
  print(dt.now().isoformat() + ' ' + ' '.join([str(s) for s in args]))

log('spark.version', str(spark.version))

log("reading seed file")
spark.conf.set("fs.azure.account.key.myaccount.dfs.core.windows.net", "my key")
seed_df = spark.read.csv("abfss://fs1@myaccount.dfs.core.windows.net/seed.csv", header=True)

# NUM_RECORDS_TO_GENERATE = 10_000_000_000
NUM_RECORDS_TO_GENERATE = 2_000_000
NUM_RECORDS_TO_GENERATE = NUM_RECORDS_TO_GENERATE + (NUM_RECORDS_TO_GENERATE % seed_df.count())
array_len = int(math.sqrt(NUM_RECORDS_TO_GENERATE / seed_df.count()))

log("array_len: %s, NUM_RECORDS_TO_GENERATE: %s, seed_df.count(): %s" % (array_len, NUM_RECORDS_TO_GENERATE, seed_df.count()))

df1 = spark.createDataFrame(data=[[ [1] * array_len ]])
df2 = df1.withColumn('exploded', F.explode(df1['_1'])).drop('_1')

# --------------- NEW STEP ---------------
# with this final_df.write.parquet() uses all executors and scales up
df2 = df2.repartition(df2.rdd.getNumPartitions())

df3 = df2.crossJoin(df2) # contains array_len ^ 2 = NUM_RECORDS_TO_GENERATE / seed_df.count() records
newdf = df3.crossJoin(seed_df) # contains NUM_RECORDS_TO_GENERATE
final_df = newdf.withColumn('uniq_row_id', F.monotonically_increasing_id()).drop('exploded') # add unique id column

# log("repartitioning")
# final_df = final_df.repartition(int(final_df.rdd.getNumPartitions() / 2))
# log("coalesceing")
# final_df = final_df.coalesce(int(final_df.rdd.getNumPartitions() / 2))

log("final_df.rdd.getNumPartitions(): ", final_df.rdd.getNumPartitions())
log('writing parquet')
final_df.write.parquet("abfss://fs1@myaccount.dfs.core.windows.net/%s/parquet-%s" % (dt.now().isoformat(), NUM_RECORDS_TO_GENERATE))
log('wrote parquet.')
log('final_df.rdd.count():', final_df.rdd.count())

新步骤:

2020-12-08T17:31:25.674825 spark.version 3.0.1
2020-12-08T17:31:25.674927 reading seed file
2020-12-08T17:31:32.770631 array_len: 377, NUM_RECORDS_TO_GENERATE: 2000002, seed_df.count(): 14
2020-12-08T17:31:33.940648 uniq_df.rdd.getNumPartitions():  16
2020-12-08T17:31:33.940848 writing parquet
2020-12-08T17:31:37.658914 wrote parquet.
2020-12-08T17:31:39.612749 uniq_df.rdd.count(): 1989806


如果删除新步骤:

2020-12-08T17:37:16.896377 spark.version 3.0.1
2020-12-08T17:37:16.896478 reading seed file
2020-12-08T17:37:18.303734 array_len: 377, NUM_RECORDS_TO_GENERATE: 2000002, seed_df.count(): 14
2020-12-08T17:37:18.817331 uniq_df.rdd.getNumPartitions():  256
2020-12-08T17:37:18.817558 writing parquet
2020-12-08T17:37:28.015959 wrote parquet.
2020-12-08T17:37:37.973600 uniq_df.rdd.count(): 1989806

【讨论】:

以上是关于DataFrame.write.parquet() 只使用一个执行器,不扩展的主要内容,如果未能解决你的问题,请参考以下文章