为啥不在空的 Spark 集群上强制执行preferredLocations?

Posted

技术标签:

【中文标题】为啥不在空的 Spark 集群上强制执行preferredLocations?【英文标题】:Why would preferredLocations not be enforced on an empty Spark cluster?为什么不在空的 Spark 集群上强制执行preferredLocations? 【发布时间】:2020-08-22 11:37:22 【问题描述】:

我的 Spark 作业由 3 名工作人员组成,与他们需要读取的数据位于同一地点。我提交了一个包含一些元数据的 RDD,作业任务将元数据转化为真实数据。例如,元数据可能包含一个要从本地工作文件系统读取的文件,而 spark 作业的第一阶段是将该文件读入 RDD 分区。

在我的环境中,数据可能不会出现在所有 3 个工作人员身上,而且跨工作人员读取数据的成本太高(即,如果数据在工作人员 1 上,那么工作人员 2 就无法伸出手来获取它)。出于这个原因,我必须将分区强制分配给他们正在读取的数据的适当工作人员。我有一种实现这一点的机制,我将工作人员与元数据中的预期工作人员进行检查,如果它们不匹配,则使用描述性错误消息使任务失败。使用黑名单,我可以确保将任务重新安排在不同的节点上,直到找到正确的节点。这很好用,但作为一种优化,我想使用 preferredLocations 来帮助最初将任务分配给正确的工作人员,而无需经历尝试/重新安排过程。

根据此处的答案,使用 makeRDD 创建具有正确首选位置的初始 RDD(元数据):How to control preferred locations of RDD partitions?,但是它没有表现出我期望的行为。 makeRDD的代码如下:

sc.makeRDD(taskAssigments)

taskAssignments 采用以下形式:

val taskAssignments = mutable.ArrayBuffer[(String, Seq[String])]()
metadataMappings.foreach  case(k , v) => 
        taskAssignments += (k + ":" + v.mkString(",") -> Seq(idHostnameMappings(k)))
      

idHostMappings 只是 id -> hostName 的映射,我已经验证它包含正确的信息。

鉴于我的测试 Spark 集群是完全干净的,没有其他作业在其上运行,并且输入 RDD 中没有偏差(它有 3 个分区来匹配 3 个工作人员),我本来希望将任务分配给他们的首选地点。相反,我仍然显示错误消息表明任务正在经历失败/重新安排过程。

我认为任务将被安排在干净集群上的首选位置的假设是否正确,我还能做些什么来强制这样做?

跟进:

我还能够创建一个更简单的测试用例。我的 3 个 spark 工人被命名为 worker1、worker2 和 worker3,我运行以下命令:

import scala.collection.mutable

val someData = mutable.ArrayBuffer[(String, Seq[String])]()

someData += ("1" -> Seq("worker1"))
someData += ("2" -> Seq("worker2"))
someData += ("3" -> Seq("worker3"))

val someRdd = sc.makeRDD(someData)

someRdd.map(i=>i + ":" + java.net.InetAddress.getLocalHost().getHostName()).collect().foreach(println)

我希望看到 1:worker1 等,但实际上看到

1:worker3
2:worker1
3:worker2

谁能解释这种行为?

【问题讨论】:

你能编辑你的帖子来展示你makeRDD()的样子吗? 【参考方案1】:

原来问题出在我的环境而不是 Spark。以防万一其他人遇到这种情况,问题是 Spark 工作人员默认情况下没有使用机器主机名。在每个工人上设置以下环境变量可以纠正它:SPARK_LOCAL_HOSTNAME: "worker1"

【讨论】:

以上是关于为啥不在空的 Spark 集群上强制执行preferredLocations?的主要内容,如果未能解决你的问题,请参考以下文章

为啥 Spark2 只在一个节点上运行?

为啥 Spark(在 Google Dataproc 上)不使用所有 vcore?

Spark惰性转换执行障碍

为啥运行不成功 spark

强制 java jar 不在 EMR 上使用类路径包

为啥只有一个 spark 作业只使用一个执行器运行?