Dataproc Pyspark 作业仅在一个节点上运行

Posted

技术标签:

【中文标题】Dataproc Pyspark 作业仅在一个节点上运行【英文标题】:Dataproc Pyspark job only running on one node 【发布时间】:2016-06-02 10:38:34 【问题描述】:

我的问题是我的 pyspark 作业没有并行运行。

代码和数据格式: 我的 PySpark 看起来像这样(很明显,很简单):

class TheThing:
    def __init__(self, dInputData, lDataInstance):
        # ...
    def does_the_thing(self):
        """About 0.01 seconds calculation time per row"""
        # ...
        return lProcessedData

#contains input data pre-processed from other RDDs
#done like this because one RDD cannot work with others inside its transformation
#is about 20-40MB in size
#everything in here loads and processes from BigQuery in about 7 minutes
dInputData = 'dPreloadedData': dPreloadedData

#rddData contains about 3M rows
#is about 200MB large in csv format
#rddCalculated is about the same size as rddData
rddCalculated = (
    rddData
        .map(
            lambda l, dInputData=dInputData: TheThing(dInputData, l).does_the_thing()
        )
)

llCalculated = rddCalculated.collect()
#save as csv, export to storage

在 Dataproc 集群上运行: 集群是通过Dataproc UI创建的。 作业是这样执行的:gcloud --project <project> dataproc jobs submit pyspark --cluster <cluster_name> <script.py>

我通过 UI 观察了作业状态,started like this。浏览它,我注意到我的工作节点中只有一个(看似随机的)在做任何事情。所有其他人都完全空闲。

PySpark 的重点是并行运行这个东西,显然不是这样。我已经在各种集群配置中运行了这些数据,最后一个是海量的,当时我注意到它是单节点使用的。因此,为什么我的工作需要很长时间才能完成,而且时间似乎与集群大小无关。

在我的本地机器和集群上,所有具有较小数据集的测试都顺利通过。我真的只需要高档。

编辑 我改了llCalculated = rddCalculated.collect()#... save to csv and exportrddCalculated.saveAsTextFile("gs://storage-bucket/results")

只有一个节点仍在工作。

【问题讨论】:

【参考方案1】:

根据您是从 GCS 还是 HDFS 加载 rddData,默认拆分大小可能是 64MB 或 128MB,这意味着您的 200MB 数据集只有 2-4 个分区。 Spark 这样做是因为典型的基本数据并行任务足够快地处理数据,以至于 64MB-128MB 意味着可能需要数十秒的处理时间,因此拆分成更小的并行块没有任何好处,因为启动开销将占主导地位。

在您的情况下,听起来每 MB 的处理时间要高得多,因为您加入了另一个数据集,并且可能对每条记录执行了相当重量级的计算。所以你需要更多的分区,否则无论你有多少节点,Spark 都不会知道分成超过 2-4 个工作单元(如果每台机器也可能被打包到一台机器上)有多个核心)。

所以你只需拨打repartition:

rddCalculated = (
    rddData
        .repartition(200)
        .map(
            lambda l, dInputData=dInputData: TheThing(dInputData, l).does_the_thing()
        )
)

或者将重新分区添加到前一行:

rddData = rddData.repartition(200)

或者,如果您在读取时重新分区,您可能会获得更好的效率:

rddData = sc.textFile("gs://storage-bucket/your-input-data", minPartitions=200)

【讨论】:

所以重新分区号repartition(x),应该与节点或CPU的数量相等吗? 似乎x 是每个节点拥有内存的最大核心数。谢谢@丹尼斯。本周英雄! 因此,只要每个分区的处理时间超过几秒钟,您实际上就可以更积极地继续重新分区;如果您有比number_of_nodes * cores_per_node 更多的分区,这实际上是一个好的模式,它只是意味着您的工作人员将在超过 1 个“波”中穿过这些分区。因此,标准做法是分区等于可用内核总数(在 1 波中完成所有操作),或者使其成为内核总数的倍数。如果需要,更多分区还可以让您在工作中扩大工作人员。 典型的限制是当每个分区变得太小以至于调度程序开销使其效率低下时(因此请尝试将每个分区的时间保持在 5 秒左右),并且一旦你得到扩展效率就会降低到 10,000 - 100,000 个分区范围内。 感谢一百万!处理每个新分区时的数据复制开销有多严重?最终我想知道的是,与多波相比,每成本的速度是否存在很大差异?

以上是关于Dataproc Pyspark 作业仅在一个节点上运行的主要内容,如果未能解决你的问题,请参考以下文章

Google Cloud Dataproc 上的 Pyspark 作业失败

在 Google Cloud DataProc 上安排 cron 作业

GCP Dataproc 节点中没有资源来启动新的 SparkSession

向 dataproc 集群提交 pyspark 作业时出错(找不到作业)

如何提交依赖于 google dataproc 集群的 pyspark 作业

使用PySpark的ETL雪花作业在本地而不在Dataproc上工作