如何使 pyspark 作业在多个节点上正确并行化并避免内存问题?

Posted

技术标签:

【中文标题】如何使 pyspark 作业在多个节点上正确并行化并避免内存问题?【英文标题】:How to make a pyspark job properly parallelizable on multiple nodes and avoid memory issues? 【发布时间】:2017-08-25 13:45:11 【问题描述】:

我目前正在从事一项 PySpark 作业 (Spark 2.2.0),该作业旨在基于一组文档训练潜在 Dirichlet 分配模型。输入文档以 CSV 文件的形式提供,位于 Google Cloud Storage 上。

以下代码在单个节点 Google Cloud Dataproc 集群(4vCPU/15GB 内存)上成功运行,其中包含一小部分文档(约 6500 个)、少量要生成的主题(10 个)和少量迭代(100)。 但是,其他尝试使用更大的文档集或更高的主题数或迭代次数值很快就会导致内存问题和作业失败。

此外,当将此作业提交到 4 节点集群时,我可以看到实际上只有一个工作节点在工作(30% 的 CPU 使用率),这让我认为代码没有针对并行处理进行适当优化。

代码

conf = pyspark.SparkConf().setAppName("lda-training")
spark = SparkSession.builder.config(conf=conf).getOrCreate()

# CSV schema declaration
csv_schema = StructType([StructField("doc_id", StringType(), True),  # id of the document
                         StructField("cleaned_content", StringType(), True)])  # cleaned text content (used for LDA)

# Step 1: Load CSV
doc_df = spark.read.csv(path="gs://...", encoding="UTF-8", schema=csv_schema)

print(" document(s) loaded".format(doc_df.count()))
# This prints "25000 document(s) loaded"

print(" partitions".format(doc_df.rdd.getNumPartitions()))
# This prints "1"

# Step 2: Extracting words
extract_words = functions.udf(lambda row: split_row(row), ArrayType(StringType()))
doc_df = doc_df.withColumn("words", extract_words(doc_df["cleaned_content"]))

# Step 3: Generate count vectors (BOW) for each document
count_vectorizer = CountVectorizer(inputCol="words", outputCol="features")
vectorizer_model = count_vectorizer.fit(doc_df)
dataset = vectorizer_model.transform(doc_df)

# Instantiate LDA model
lda = LDA(k=100,  # number of topics
          optimizer="online", # 'online' or 'em'
          maxIter=100,
          featuresCol="features",
          topicConcentration=0.01,  # beta
          optimizeDocConcentration=True,  # alpha
          learningOffset=2.0,
          learningDecay=0.8,
          checkpointInterval=10,
          keepLastCheckpoint=True)

# Step 4: Train LDA model on corpus (this is the long part of the job)
lda_model = lda.fit(dataset)

# Save LDA model to Cloud Storage
lda_model.write().overwrite().save("gs://...")

以下是遇到的警告和错误消息示例:

WARN org.apache.spark.scheduler.TaskSetManager: Stage 7 contains a task of very large size (3791 KB). The maximum recommended task size is 100 KB.
WARN org.apache.spark.scheduler.TaskSetManager: Stage 612 contains a task of very large size (142292 KB). The maximum recommended task size is 100 KB.
WARN org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: Container killed by YARN for exceeding memory limits. 6.1 GB of 6 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
WARN org.apache.spark.scheduler.TaskSetManager: Lost task 0.0 in stage 303.0 (TID 302, cluster-lda-w-1.c.cognitive-search-engine-dev.internal, executor 3): ExecutorLostFailure (executor 3 exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits. 6.1 GB of 6 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
ERROR org.apache.spark.scheduler.cluster.YarnScheduler: Lost executor 3 on cluster-lda-w-1.c.cognitive-search-engine-dev.internal: Container killed by YARN for exceeding memory limits. 6.1 GB of 6 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.

问题

是否可以对代码本身进行任何优化以确保其可扩展性? 我们如何让 Spark 将作业分配给所有工作节点,并希望避免内存问题?

【问题讨论】:

【参考方案1】:

如果您的输入数据大小很小,即使您的管道最终对小数据进行密集计算,那么基于大小的分区将导致分区太少而无法扩展。由于您的getNumPartitions() 打印了1,这表明Spark 最多将使用1 个执行器核心来处理该数据,这就是为什么您只能看到一个工作节点在工作。

您可以尝试更改您的初始 spark.read.csv 行以在末尾包含 repartition

doc_df = spark.read.csv(path="gs://...", ...).repartition(32)

然后您可以通过在后面的一行看到getNumPartitions() print 32 来验证它是否符合您的预期。

【讨论】:

我确实可以验证增加分区数量允许 Spark 将作业拆分给多个工作人员。考虑到这一点,我仍然遇到很多内存问题(Container killed by YARN for exceeding memory limitsjava.lang.OutOfMemoryError: Java heap space)。作为输入权重的 CSV 文件约为 600MB。对于如何估计正确的分区数量与工作节点的大小(CPU 内核数 + 内存),您有什么建议吗?

以上是关于如何使 pyspark 作业在多个节点上正确并行化并避免内存问题?的主要内容,如果未能解决你的问题,请参考以下文章

MPI +线程并行化与仅MPI的优势(如果有的话)是什么?

Dataproc Pyspark 作业仅在一个节点上运行

Pyspark:如何在 HDFS 中并行化多 gz 文件处理

如何检查 Dataproc 上 pyspark 作业的每个执行程序/节点内存使用指标?

通过 JDBC 进行并行化 - Pyspark - 并行化如何使用 JDBC 工作?

在工作节点上并行排序 PySpark 数据帧