Spark:以编程方式获取集群核心数

Posted

技术标签:

【中文标题】Spark:以编程方式获取集群核心数【英文标题】:Spark: get number of cluster cores programmatically 【发布时间】:2018-05-04 02:09:12 【问题描述】:

我在纱线集群中运行我的 spark 应用程序。在我的代码中,我使用队列的可用核心数在我的数据集上创建分区:

Dataset ds = ...
ds.coalesce(config.getNumberOfCores());

我的问题:如何通过编程方式而不是通过配置获取队列的可用核心数?

【问题讨论】:

您使用的是哪个资源管理器?纱线或金币 我正在使用纱线。 从yarn cluster API中提取需要的队列参数,然后在coalesce中使用 【参考方案1】:

有多种方法可以从 Spark 获取集群中的执行器数量和核心数量。这是我过去使用的一些 Scala 实用程序代码。您应该能够轻松地将其适应 Java。有两个关键思想:

    worker 的数量是 executor 的数量减一或sc.getExecutorStorageStatus.length - 1

    每个worker的核数可以通过在worker上执行java.lang.Runtime.getRuntime.availableProcessors得到。

其余代码是使用 Scala 隐式向SparkContext 添加便捷方法的样板。我在 1.x 年前编写了代码,这就是为什么它没有使用 SparkSession

最后一点:合并多个内核通常是个好主意,因为这样可以在数据倾斜的情况下提高性能。在实践中,我使用 1.5 倍到 4 倍之间的任意值,具体取决于数据的大小以及作业是否在共享集群上运行。

import org.apache.spark.SparkContext

import scala.language.implicitConversions


class RichSparkContext(val sc: SparkContext) 

  def executorCount: Int =
    sc.getExecutorStorageStatus.length - 1 // one is the driver

  def coresPerExecutor: Int =
    RichSparkContext.coresPerExecutor(sc)

  def coreCount: Int =
    executorCount * coresPerExecutor

  def coreCount(coresPerExecutor: Int): Int =
    executorCount * coresPerExecutor




object RichSparkContext 

  trait Enrichment 
    implicit def enrichMetadata(sc: SparkContext): RichSparkContext =
      new RichSparkContext(sc)
  

  object implicits extends Enrichment

  private var _coresPerExecutor: Int = 0

  def coresPerExecutor(sc: SparkContext): Int =
    synchronized 
      if (_coresPerExecutor == 0)
        sc.range(0, 1).map(_ => java.lang.Runtime.getRuntime.availableProcessors).collect.head
      else _coresPerExecutor
    


更新

最近,getExecutorStorageStatus 已被删除。我们已经切换到使用SparkEnvblockManager.master.getStorageStatus.length - 1 (减一再次用于驱动程序)。在org.apache.spark 包之外无法通过SparkContextenv 访问它。因此,我们使用封装违规模式:

package org.apache.spark

object EncapsulationViolator 
  def sparkEnv(sc: SparkContext): SparkEnv = sc.env

【讨论】:

sc.getExecutorStorageStatus.length - 1 对我有好处。谢谢 有时执行器内核被过度配置或配置不足,这意味着 JVM 运行时功能可能不准确。 @tribbloid 绝对正确,在各种集群管理系统中复杂的动态池配置的情况下也是如此。这是针对常见/简单情况的,需要针对复杂场景进行调整。 仅供参考,自 Spark 2.4.4 起,getExecutorStorageStatus 不再可用 @Sim 正确。调试对我来说是一个更好的词,因为有时执行单线程会很有帮助。【参考方案2】:

在寻找几乎相同问题的答案时发现了这个。

我发现:

Dataset ds = ...
ds.coalesce(sc.defaultParallelism());

完全符合 OP 的要求。

例如,我的 5 节点 x 8 核心集群为 defaultParallelism 返回 40。

【讨论】:

【参考方案3】:

根据Databricks如果驱动程序和执行程序是相同的节点类型,这是要走的路:

java.lang.Runtime.getRuntime.availableProcessors * (sc.statusTracker.getExecutorInfos.length -1)

【讨论】:

java.lang.Runtime.getRuntime.availableProcessors 告诉你当前机器上有多少 CPU。不能假设集群中的所有机器都是如此。 @JamesMoore 你是对的。这仅适用于 Driver 和 Worker 节点具有相同节点类型的情况。【参考方案4】:

您可以在每台机器上运行作业并询问它的内核数量,但这不一定是 Spark 可用的(正如 @tribbloid 在另一个答案的评论中指出的那样):

import spark.implicits._
import scala.collection.JavaConverters._
import sys.process._
val procs = (1 to 1000).toDF.map(_ => "hostname".!!.trim -> java.lang.Runtime.getRuntime.availableProcessors).collectAsList().asScala.toMap
val nCpus = procs.values.sum

在 shell 中运行它(在一个有两个工作人员的小型测试集群上)给出:

scala> :paste
// Entering paste mode (ctrl-D to finish)

    import spark.implicits._
    import scala.collection.JavaConverters._
    import sys.process._
    val procs = (1 to 1000).toDF.map(_ => "hostname".!!.trim -> java.lang.Runtime.getRuntime.availableProcessors).collectAsList().asScala.toMap
    val nCpus = procs.values.sum

// Exiting paste mode, now interpreting.

import spark.implicits._                                                        
import scala.collection.JavaConverters._
import sys.process._
procs: scala.collection.immutable.Map[String,Int] = Map(ip-172-31-76-201.ec2.internal -> 2, ip-172-31-74-242.ec2.internal -> 2)
nCpus: Int = 4

如果您的集群中通常有 很多 台机器,请将零添加到您的范围内。即使在我的两机集群上,10000 也能在几秒钟内完成。

这可能仅在您想要的信息多于 sc.defaultParallelism() 提供给您的信息时才有用(如@SteveC 的回答)

【讨论】:

以上是关于Spark:以编程方式获取集群核心数的主要内容,如果未能解决你的问题,请参考以下文章

Spark核心编程进阶-yarn模式下日志查看详解

Spark核心编程---创建RDD

Spark核心-RDD

spark浅谈:SPARK核心编程

如何以编程方式填充核心数据存储,为每条记录生成索引

以编程方式更改核心数据文件中的数据