Apache Spark:核心数与执行程序数

Posted

技术标签:

【中文标题】Apache Spark:核心数与执行程序数【英文标题】:Apache Spark: The number of cores vs. the number of executors 【发布时间】:2014-08-28 14:45:59 【问题描述】:

我试图了解在 YARN 上运行 Spark 作业时核心数和执行程序数的关系。

测试环境如下:

数据节点数:3 数据节点机器规格: CPU:Core i7-4790(内核数:4,线程数:8) 内存:32GB (8GB x 4) 硬盘:8TB (2TB x 4)

网络:1Gb

Spark 版本:1.0.0

Hadoop 版本:2.4.0 (Hortonworks HDP 2.1)

Spark 作业流程:sc.textFile -> filter -> map -> filter -> mapToPair -> reduceByKey -> map -> saveAsTextFile

输入数据

类型:单个文本文件 大小:165GB 行数:454,568,833

输出

第二个过滤器后的行数:310,640,717 结果文件行数:99,848,268 结果文件大小:41GB

作业使用以下配置运行:

    --master yarn-client --executor-memory 19G --executor-cores 7 --num-executors 3(每个数据节点的执行器,与内核一样多)

    --master yarn-client --executor-memory 19G --executor-cores 4 --num-executors 3(内核数量减少)

    --master yarn-client --executor-memory 4G --executor-cores 2 --num-executors 12(更少核心,更多执行器)

经过的时间:

    50 分 15 秒

    55 分 48 秒

    31 分 23 秒

令我惊讶的是,(3) 更快。 我认为(1)会更快,因为洗牌时执行者之间的通信会更少。 尽管 (1) 的核心数少于 (3),但核心数不是关键因素,因为 2) 确实表现良好。

(以下是在 pwilmot 的回答之后添加的。)

有关信息,性能监视器屏幕截图如下:

(1) 的神经节数据节点摘要 - 作业于 04:37 开始。

(3) 的神经节数据节点摘要 - 作业于 19:47 开始。请忽略该时间之前的图表。

该图大致分为两部分:

首先:从开始到 reduceByKey:CPU 密集型,无网络活动 第二次:reduceByKey:CPU降低后,网络I/O完成。

如图所示,(1) 可以使用尽可能多的 CPU 功率。所以,可能不是线程数的问题。

如何解释这个结果?

【问题讨论】:

现在我怀疑 GC...事实上,在 Spark UI 上,GC 花费的总时间在 1) 上比 2) 上更长。 你为什么不试试 3) 19G?是否将工人限制在 4G 上会降低某些人的 NUMA 效应?即您的 4G 位于分配给您的工作流程的 2 个核心之一上,因此 i/o 减速更少,从而带来更好的整体性能。否则我认为一个主要问题是:有多少核心/线程可以在一个工作人员上使用一个执行器? (只能指定一个worker的总核数,不能指定executor的粒度) 顺便说一句,我刚刚检查了 core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala 中的代码,似乎 1 executor = 1 worker 线程。 有点晚了,但这里有一篇关于 cloudera 关于这个主题的帖子:blog.cloudera.com/blog/2015/03/… 顺便说一句,我在 cloudera slide deck slideshare.net/cloudera/… 中找到了这个信息,它解释了一些关于执行器、内核和内存的决策 【参考方案1】:

为了希望使所有这些更具体,这里有一个配置 Spark 应用程序以使用尽可能多的集群的工作示例 可能:想象一个集群有 六个节点 运行 NodeManagers,每个 配备 16 核和 64GB 内存。 NodeManager 的能力, yarn.nodemanager.resource.memory-mb 和 yarn.nodemanager.resource.cpu-vcores,应该设置为 63 * 1024 = 64512(兆字节)和 15 分别。我们避免分配 100% YARN 容器的资源,因为节点需要一些 运行操作系统和 Hadoop 守护进程的资源。在这种情况下,我们留下一个 千兆字节和这些系统进程的核心。 Cloudera Manager 提供帮助 通过考虑这些并配置这些 YARN 属性 自动。

可能的第一个冲动是使用 --num-executors 6 --executor-cores 15 --executor-memory 63G。然而,这是错误的做法,因为:

63GB + 执行器内存开销超出 63GB 容量 节点管理器。应用程序主控将占用一个核心 节点数,这意味着 15 核执行器将没有空间 在那个节点上。每个执行程序 15 个内核可能导致 HDFS I/O 错误 吞吐量。

更好的选择是使用 --num-executors 17 --executor-cores 5 --executor-memory 19G。为什么?

这个配置会在所有节点上产生三个执行器,除了一个 与 AM,它将有两个执行者。 --executor-memory 派生为(每个节点 63/3 个执行器)= 21. 21 * 0.07 = 1.47。 21 – 1.47 ~ 19。

Cloudera 的博客How-to: Tune Your Apache Spark Jobs (Part 2) 的一篇文章中给出了解释。

【讨论】:

“这个配置会在所有节点上产生三个执行器,除了带有 AM 的那个,它将有两个执行器。”。这对“--executor-cores 5”意味着什么? 表示每个执行器使用5个核心。每个节点有 3 个执行器,因此使用 15 个核心,除了其中一个节点还将为作业运行应用程序主控器,因此只能托管 2 个执行器,即 10 个核心用作执行器。 很好解释 - 请注意这适用于 yarn.scheduler.capacity.resource-calculator disabled,这是默认设置。这是因为默认情况下它按内存而不是 CPU 进行调度。 更多的执行器会导致 HDFS I/O 吞吐量下降。因此,如果我根本不使用 HDFS,在这种情况下,每个执行程序可以使用超过 5 个内核吗? 我虽然应用程序主运行在每个节点上。如上所述,这意味着只有 1 个 Application Master 来运行该作业。对吗?【参考方案2】:

根据Sandy Ryza,当您在 HDFS 上运行 Spark 应用程序时

我注意到 HDFS 客户端存在大量并发问题 线程。一个粗略的猜测是,最多每个执行者五个任务可以 达到全写吞吐量,所以最好保持 每个执行者的核心数低于该数字。

所以我认为您的第一个配置比第三个配置慢是因为 HDFS I/O 吞吐量不佳

【讨论】:

【参考方案3】:

简答:我认为tgbaggio 是对的。您达到了执行程序的 HDFS 吞吐量限制。

我认为这里的答案可能比这里的一些建议要简单一些。

对我来说,线索在集群网络图中。对于运行 1,利用率稳定在 ~50 M 字节/秒。对于运行 3,稳定利用率翻了一番,约为 100 M 字节/秒。

来自DzOrd分享的the cloudera blog post,你可以看到这个重要的报价:

我注意到 HDFS 客户端在处理大量并发线程时遇到了问题。粗略的猜测是,每个 executor 最多 5 个任务可以实现完整的写入吞吐量,因此最好将每个 executor 的内核数保持在该数字以下。

所以,让我们做一些计算,看看如果这是真的,我们期望的性能是什么。


运行 1:19 GB,7 个内核,3 个执行程序

3 个执行器 x 7 个线程 = 21 个线程 每个执行程序有 7 个内核,我们预计对 HDFS 的 IO 有限(最多约 5 个内核) 有效吞吐量 ~= 3 个执行器 x 5 个线程 = 15 个线程

运行 3:4 GB,2 个内核,12 个执行程序

2 执行器 x 12 线程 = 24 线程 每个执行器2个核心,所以hdfs吞吐量还可以 有效吞吐量 ~= 12 个执行器 x 2 个线程 = 24 个线程

如果作业 100% 受并发(线程数)限制。我们希望运行时与线程数完全成反比。

ratio_num_threads = nthread_job1 / nthread_job3 = 15/24 = 0.625
inv_ratio_runtime = 1/(duration_job1 / duration_job3) = 1/(50/31) = 31/50 = 0.62

所以ratio_num_threads ~= inv_ratio_runtime,看起来我们网络受限。

同样的效果解释了 Run 1 和 Run 2 之间的区别。


运行 2:19 GB,4 个内核,3 个执行程序

3 个执行器 x 4 个线程 = 12 个线程 每个执行程序有 4 个内核,可以 IO 到 HDFS 有效吞吐量 ~= 3 执行器 x 4 线程 = 12 线程

比较有效线程数和运行时间:

ratio_num_threads = nthread_job2 / nthread_job1 = 12/15 = 0.8
inv_ratio_runtime = 1/(duration_job2 / duration_job1) = 1/(55/50) = 50/55 = 0.91

它不像上次比较那么完美,但当我们失去线程时,我们仍然看到类似的性能下降。

最后一点:为什么我们会在使用更多线程时获得更好的性能,尤其是。线程数多于 CPU 数?

Rob 的这篇精彩文章很好地解释了并行性(我们通过将数据分配到多个 CPU 上得到的结果)和并发性(当我们使用多个线程在单个 CPU 上工作时得到的结果)之间的区别派克:Concurrency is not parallelism.

简短的解释是,如果 Spark 作业与文件系统或网络交互,CPU 会花费大量时间等待与这些接口的通信,而不是花费大量时间实际“工作”。通过让这些 CPU 一次处理超过 1 个任务,它们等待的时间更少,工作的时间更多,您会看到更好的性能。

【讨论】:

有趣且令人信服的解释,我想知道您是如何猜测执行器有 5 个任务限制以实现最大吞吐量的。 所以数字 5 不是我想出来的:我只是注意到 IO 瓶颈的迹象,然后开始寻找这些瓶颈可能来自哪里。【参考方案4】:

我自己没有玩过这些设置,所以这只是猜测,但如果我们将此问题视为分布式系统中的普通内核和线程,那么在您的集群中,您最多可以使用 12 个内核(4 * 3 台机器)和 24 线程(8 * 3 机器)。在您的前两个示例中,您为您的工作提供了相当数量的内核(潜在的计算空间),但在这些内核上运行的线程(工作)数量非常有限,以至于您无法使用分配的大部分处理能力因此即使分配了更多的计算资源,作业也会变慢。

您提到您关心的是随机播放步骤 - 虽然限制随机播放步骤中的开销很好,但利用集群的并行化通常更为重要。考虑一下极端情况——零随机播放的单线程程序。

【讨论】:

感谢您的回答。但我怀疑线程数不是主要问题。我添加了监控屏幕截图。如图所示,1) 可以使用尽可能多的 CPU 功率。 @zeodtr pwilmot 是正确的 - 您至少需要 2-4 个任务才能充分利用核心的潜力。就是这样 - 我通常为我的 80 个核心集群使用至少 1000 个分区。 @samthebest 我想知道的是1)和3)之间性能差异的原因。当我观看 Spark UI 时,在第 2 部分中,两者都并行运行 21 个任务。(为什么 21 而不是 24 在 3 的情况下)现在是未知的)但是,3)的任务运行得更快。【参考方案5】:

来自RStudio's Sparklyr package page 上的优秀资源:

SPARK 定义

提供一些简单的定义可能很有用 对于 Spark 命名法:

节点:服务器

工作节点:作为集群一部分的服务器,可供 运行 Spark 作业

主节点:协调 Worker 节点的服务器。

Executor:节点内的一种虚拟机。一个节点可以有 多个执行者。

驱动节点:启动 Spark 会话的节点。通常, 这将是 sparklyr 所在的服务器。

Driver(Executor):Driver Node也会出现在Executor中 列表。

【讨论】:

【参考方案6】:

我认为主要原因之一是地方性。你的输入文件大小是165G,文件的相关块肯定分布在多个DataNode上,更多的executor可以避免网络复制。

尝试设置执行器数量等于块数,我认为可以更快。

【讨论】:

【参考方案7】:

Spark 动态分配提供了灵活性并动态分配资源。在这个数量的 min 和 max executors 中可以给出。此外,还可以给出在应用程序启动时必须启动的执行程序的数量。

请阅读以下内容:

http://spark.apache.org/docs/latest/configuration.html#dynamic-allocation

【讨论】:

【参考方案8】:

我认为前两种配置存在一个小问题。线程和核心的概念如下。线程的概念是,如果核心是理想的,则使用该核心来处理数据。所以在前两种情况下内存没有被充分利用。如果您想对这个示例进行基准测试,请选择每台机器上拥有超过 10 个核心 的机器。然后做基准测试。

但每个执行程序的核心数不要超过 5 个,否则 i/o 性能会出现瓶颈。

因此,进行此基准测试的最佳机器可能是具有 10 个内核的数据节点。

数据节点机器规格: CPU:Core i7-4790(核心数:10,线程数:20) 内存:32GB(8GB x 4) 硬盘:8TB (2TB x 4)

【讨论】:

以上是关于Apache Spark:核心数与执行程序数的主要内容,如果未能解决你的问题,请参考以下文章

Apache Spark 如何将分区 ID 分配给其执行程序

在 Bash 脚本中执行 Apache Spark (Scala) 代码

Spark教程——(11)Spark程序本地执行和集群执行的差异

如何获取有关当前执行程序 Apache-Spark 的元数据?

org.apache.spark.SparkException:无法执行用户定义的函数

Apache Spark:采用本地方式执行任务