DataFrame.write.parquet() 只使用一个执行器,不扩展
Posted
技术标签:
【中文标题】DataFrame.write.parquet() 只使用一个执行器,不扩展【英文标题】:DataFrame.write.parquet() uses only one executor, does not scale 【发布时间】:2020-12-05 01:02:18 【问题描述】:我读过:
Spark task runs on only one executor(以及它指向的那些) Spark Dataset cache is using only one executor spark streaming only executors on one machine is working我正在尝试通过crossJoin
ing 种子数据帧和其他一些数据帧来生成一个包含 100 亿行的测试数据集。我的问题是流程的最后一步final_df.write.parquet()
只使用一个工人/执行者,无论有多少。
这显然无法扩展到产生数十亿美元。问题出在哪里。
例如在每个有 4 个核心的 3 节点集群中,final_df
有 64 个分区,但只有一个执行程序使用该数据帧中的所有记录写入一个 parquet 文件。我还尝试了 12 个节点,它产生了具有 1936 个分区的数据帧。但同样的问题。
一些观察:
我认为 64 个分区来自 2 个crossJoin()
。一个节点是主节点,留下 2 个执行器,每个执行器有 4 个处理器,因此它输出到:2*2*4*4 = 64
。
如果我取消注释 coalesce()
的行,它会将分区数减少到 8 个。但只有一个执行程序会使用该数据帧中的所有记录写入一个 parquet 文件。
如果我repartition()
(没有coalesce()
),那么我得到8个文件,所有的执行器都被使用并且写作被完美地分配。但是现在问题转移到了重新分区步骤,该步骤仅由一个执行者完成。最后还是同样的问题。
import os, math
import pyspark.sql.functions as F
from datetime import datetime as dt
def log(*args):
print(dt.now().isoformat() + ' ' + ' '.join([str(s) for s in args]))
log('spark.version', str(spark.version))
log("reading seed file")
spark.conf.set("fs.azure.account.key.myaccount.dfs.core.windows.net", "my key")
seed_df = spark.read.csv("abfss://fs1@myaccount.dfs.core.windows.net/seed.csv", header=True)
# NUM_RECORDS_TO_GENERATE = 10_000_000_000
NUM_RECORDS_TO_GENERATE = 2_000_000
NUM_RECORDS_TO_GENERATE = NUM_RECORDS_TO_GENERATE + (NUM_RECORDS_TO_GENERATE % seed_df.count())
array_len = int(math.sqrt(NUM_RECORDS_TO_GENERATE / seed_df.count()))
log("array_len: %s, NUM_RECORDS_TO_GENERATE: %s, seed_df.count(): %s" % (array_len, NUM_RECORDS_TO_GENERATE, seed_df.count()))
df1 = spark.createDataFrame(data=[[ [1] * array_len ]])
df2 = df1.withColumn('exploded', F.explode(df1['_1'])).drop('_1')
df3 = df2.crossJoin(df2) # contains array_len ^ 2 = NUM_RECORDS_TO_GENERATE / seed_df.count() records
newdf = df3.crossJoin(seed_df) # contains NUM_RECORDS_TO_GENERATE
final_df = newdf.withColumn('uniq_row_id', F.monotonically_increasing_id()).drop('exploded') # add unique id column
# log("repartitioning")
# final_df = final_df.repartition(int(final_df.rdd.getNumPartitions() / 2))
# log("coalesceing")
# final_df = final_df.coalesce(int(final_df.rdd.getNumPartitions() / 2))
log("final_df.rdd.getNumPartitions(): ", final_df.rdd.getNumPartitions())
log('writing parquet')
final_df.write.parquet("abfss://fs1@myaccount.dfs.core.windows.net/%s/parquet-%s" % (dt.now().isoformat(), NUM_RECORDS_TO_GENERATE))
log('wrote parquet.')
log('final_df.rdd.count():', final_df.rdd.count())
输出
2020-12-05T00:27:51.933995 spark.version 3.0.1
2020-12-05T00:27:51.934079 reading seed file
2020-12-05T00:27:52.713461 array_len: 377, NUM_RECORDS_TO_GENERATE: 2000002, seed_df.count(): 14
2020-12-05T00:27:52.852547 final_df.rdd.getNumPartitions(): 64
2020-12-05T00:27:52.852749 writing parquet
2020-12-05T00:28:00.823663 wrote parquet.
2020-12-05T00:28:08.757957 final_df.rdd.count(): 1989806
合并
... same as above ...
2020-12-05T00:12:22.620791 coalesceing
2020-12-05T00:12:22.860093 final_df.rdd.getNumPartitions(): 32
2020-12-05T00:12:22.860249 writing parquet
2020-12-05T00:12:31.280416 wrote parquet.
2020-12-05T00:12:39.204093 final_df.rdd.count(): 1989806
重新分区
... same as above ...
2020-12-05T00:23:40.155481 repartitioning
2020-12-05T00:23:44.702251 final_df.rdd.getNumPartitions(): 8
2020-12-05T00:23:44.702421 writing parquet
2020-12-05T00:23:50.478841 wrote parquet.
2020-12-05T00:23:52.174997 final_df.rdd.count(): 1989806
耗时较长阶段的 DAG 可视化:
PS:忽略NUM_RECORDS_TO_GENERATE
值与实际生成记录数的轻微不匹配。这可能是sqrt
中的一个数学问题,我不在乎它是否相差几百万。
【问题讨论】:
【参考方案1】:所以我解决了,但我还是不知道为什么旧代码只使用一个执行器。
在与其他人交叉连接之前,我添加了一个新步骤来重新分区原始数据帧。之后,生成的数据帧使用所有执行器。
import os, math
import pyspark.sql.functions as F
from datetime import datetime as dt
def log(*args):
print(dt.now().isoformat() + ' ' + ' '.join([str(s) for s in args]))
log('spark.version', str(spark.version))
log("reading seed file")
spark.conf.set("fs.azure.account.key.myaccount.dfs.core.windows.net", "my key")
seed_df = spark.read.csv("abfss://fs1@myaccount.dfs.core.windows.net/seed.csv", header=True)
# NUM_RECORDS_TO_GENERATE = 10_000_000_000
NUM_RECORDS_TO_GENERATE = 2_000_000
NUM_RECORDS_TO_GENERATE = NUM_RECORDS_TO_GENERATE + (NUM_RECORDS_TO_GENERATE % seed_df.count())
array_len = int(math.sqrt(NUM_RECORDS_TO_GENERATE / seed_df.count()))
log("array_len: %s, NUM_RECORDS_TO_GENERATE: %s, seed_df.count(): %s" % (array_len, NUM_RECORDS_TO_GENERATE, seed_df.count()))
df1 = spark.createDataFrame(data=[[ [1] * array_len ]])
df2 = df1.withColumn('exploded', F.explode(df1['_1'])).drop('_1')
# --------------- NEW STEP ---------------
# with this final_df.write.parquet() uses all executors and scales up
df2 = df2.repartition(df2.rdd.getNumPartitions())
df3 = df2.crossJoin(df2) # contains array_len ^ 2 = NUM_RECORDS_TO_GENERATE / seed_df.count() records
newdf = df3.crossJoin(seed_df) # contains NUM_RECORDS_TO_GENERATE
final_df = newdf.withColumn('uniq_row_id', F.monotonically_increasing_id()).drop('exploded') # add unique id column
# log("repartitioning")
# final_df = final_df.repartition(int(final_df.rdd.getNumPartitions() / 2))
# log("coalesceing")
# final_df = final_df.coalesce(int(final_df.rdd.getNumPartitions() / 2))
log("final_df.rdd.getNumPartitions(): ", final_df.rdd.getNumPartitions())
log('writing parquet')
final_df.write.parquet("abfss://fs1@myaccount.dfs.core.windows.net/%s/parquet-%s" % (dt.now().isoformat(), NUM_RECORDS_TO_GENERATE))
log('wrote parquet.')
log('final_df.rdd.count():', final_df.rdd.count())
新步骤:
2020-12-08T17:31:25.674825 spark.version 3.0.1
2020-12-08T17:31:25.674927 reading seed file
2020-12-08T17:31:32.770631 array_len: 377, NUM_RECORDS_TO_GENERATE: 2000002, seed_df.count(): 14
2020-12-08T17:31:33.940648 uniq_df.rdd.getNumPartitions(): 16
2020-12-08T17:31:33.940848 writing parquet
2020-12-08T17:31:37.658914 wrote parquet.
2020-12-08T17:31:39.612749 uniq_df.rdd.count(): 1989806
如果删除新步骤:
2020-12-08T17:37:16.896377 spark.version 3.0.1
2020-12-08T17:37:16.896478 reading seed file
2020-12-08T17:37:18.303734 array_len: 377, NUM_RECORDS_TO_GENERATE: 2000002, seed_df.count(): 14
2020-12-08T17:37:18.817331 uniq_df.rdd.getNumPartitions(): 256
2020-12-08T17:37:18.817558 writing parquet
2020-12-08T17:37:28.015959 wrote parquet.
2020-12-08T17:37:37.973600 uniq_df.rdd.count(): 1989806
【讨论】:
以上是关于DataFrame.write.parquet() 只使用一个执行器,不扩展的主要内容,如果未能解决你的问题,请参考以下文章