Spark 中的阶段是如何划分为任务的?

Posted

技术标签:

【中文标题】Spark 中的阶段是如何划分为任务的?【英文标题】:How are stages split into tasks in Spark? 【发布时间】:2016-09-28 10:10:48 【问题描述】:

我们假设在每个时间点只有一个 Spark 作业在运行。

到目前为止我得到了什么

以下是我对 Spark 的理解:

    创建SparkContext 时,每个工作节点都会启动一个执行程序。 执行器是独立的进程 (JVM),连接回驱动程序。每个执行器都有驱动程序的 jar。退出驱动程序,关闭执行程序。每个执行者可以持有一些分区。 执行作业时,会根据沿袭图创建执行计划。 执行作业分为多个阶段,其中阶段包含尽可能多的相邻(在沿袭图中)转换和操作,但没有洗牌。因此,各个阶段由 shuffle 分隔。

我明白了

任务是通过序列化 Function 对象从驱动程序发送到执行程序的命令。 执行程序反序列化(使用驱动程序 jar)命令(任务)并在分区上执行。

但是

问题

如何将阶段拆分为这些任务?

具体来说:

    任务是由转换和操作确定还是可以是多个转换/操作在一个任务中? 任务是否由分区确定(例如,每个分区每个阶段一个任务)。 任务是否由节点决定(例如,每个节点每个阶段一个任务)?

我的想法(只是部分答案,即使是对的)

在https://0x0fff.com/spark-architecture-shuffle中,用图片解释了shuffle

我觉得规则是

每个阶段分为#number-of-partitions 个任务,不考虑节点数

对于我的第一张图片,我会说我有 3 个 map 任务和 3 个 reduce 任务。

对于来自 0x0fff 的图像,我会说有 8 个 map 任务和 3 个 reduce 任务(假设只有三个橙色和三个深绿色文件)。

任何情况下的开放式问题

正确吗?但即使那是正确的,我上面的问题也没有全部回答,因为它仍然是开放的,多个操作(例如多个地图)是在一个任务中还是每个操作被分成一个任务。

别人怎么说

What is a task in Spark? How does the Spark worker execute the jar file? 和How does the Apache Spark scheduler split files into tasks? 很相似,但我觉得我的问题在那里没有得到明确的回答。

【问题讨论】:

如果您能添加更多见解,我将不胜感激,我也有类似的问题。 @Nag:我的问题也在寻找更多见解,这就是我问的原因:-)。答案是否提供了您在哪里寻找的东西?您需要什么样的见解? 啊,明白了。我想,既然这个问题发布得有点老,也许你会对你提出的问题有一些见解。想和你核实一下:-) @Nag:嗯,自从我上次使用 Spark 以来已经有好几年了,所以 a) 如果我想知道 Spark 是如何工作的,我必须再次阅读它(我忘了大多数细节)和b)我写的内容可能已经过时了,特别是因为我的帖子主要涉及Spark 1.x,并且Spark 2.x发生了很多变化,afai记得。但也许与后端架构无关的变化 - 这也可能是真的。 太棒了。谢谢!! 【参考方案1】:

这里有一个非常漂亮的大纲。回答您的问题

需要为每个stage 的每个数据分区启动一个单独的task确实。考虑到每个分区可能位于不同的物理位置 - 例如HDFS 或本地文件系统的目录/卷中的块。

注意Stages 的提交是由DAG Scheduler 驱动的。这意味着可以将不相互依赖的阶段提交到集群以并行执行:这最大限度地提高了集群的并行化能力。因此,如果我们的数据流中的操作可以同时发生,我们预计会看到多个阶段启动。

我们可以在以下玩具示例中看到这一点,我们在其中执行以下类型的操作:

加载两个数据源 分别对两个数据源执行一些映射操作 加入他们 对结果执行一些映射和过滤操作 保存结果

那么我们最终会有多少阶段呢?

1 个阶段,每个阶段用于并行加载两个数据源 = 2 个阶段 代表join 的第三阶段依赖其他两个阶段 注意:对连接数据进行的所有后续操作都可以在相同阶段执行,因为它们必须按顺序进行。启动其他阶段没有任何好处,因为在之前的操作完成之前它们无法开始工作。

这是那个玩具程序

val sfi  = sc.textFile("/data/blah/input").map x => val xi = x.toInt; (xi,xi*xi) 
val sp = sc.parallelize (0 until 1000).map x => (x,x * x+1) 
val spj = sfi.join(sp)
val sm = spj.mapPartitions iter => iter.map case (k,(v1,v2)) => (k, v1+v2) 
val sf = sm.filter case (k,v) => v % 10 == 0 
sf.saveAsTextFile("/data/blah/out")

这是结果的 DAG

现在:有多少任务?任务数应该等于

(Stage * #Partitions in the stage) 的总和

【讨论】:

谢谢!请详细说明您对我的文字的回答:1)我对阶段的定义不全面吗?听起来我错过了一个阶段不能包含可以并行的操作的要求。还是我的描述已经严格暗示了这一点? 2)作业必须执行的任务数量取决于分区的数量,而不是处理器或节点的数量,而可以同时执行的任务数量取决于分区的数量处理器,对吧? 3) 一个任务可以包含多个操作? 4) 你最后一句话是什么意思?毕竟,分区的数量可能因阶段而异。您的意思是这就是您为所有阶段配置工作的方式吗? @Make42 当然,分区的数量可能因阶段而异——你是对的。我说sum(..) 是为了考虑到这种变化。 哇,你的回答完全没问题,但不幸的是,最后一句话绝对是一个错误的概念。这并不意味着一个阶段中的分区数等于处理器数,但是,您可以根据机器上显示的内核数来设置 RDD 的分区数。 @epcpu 这是一个特例 - 但我同意这会产生误导,所以我将其删除。【参考方案2】:

这可能会帮助您更好地理解不同的部分:

Stage:是任务的集合。运行相同的进程 不同的数据子集(分区)。 任务:代表一个单位 处理分布式数据集的分区。所以在每个阶段, number-of-tasks = number-of-partitions,或者正如你所说的“每个任务一个 每个分区的阶段”。 每个执行器在一个纱线容器上运行,并且 每个容器驻留在一个节点上。 每个阶段使用多个执行器,每个执行器分配有多个 vcore。 每个 vcore 一次只能执行一项任务 因此,在任何阶段,多个任务都可以并行执行。正在运行的任务数 = 正在使用的 vcore 数。

【讨论】:

这是一本关于 Spark 架构的非常有用的读物​​:0x0fff.com/spark-architecture 我没有得到你的第3点。据我所知每个节点可以有多个执行器,所以根据第3点:每个节点应该只有一个执行器。你能澄清这一点吗? @RituparnoBehera 每个节点可以有多个容器,因此可以有多个 Spark 执行器。看看这个链接。 docs.cloudera.com/runtime/7.0.2/running-spark-applications/…【参考方案3】:

如果我理解正确的话,有 2 个(相关的)事情会让你感到困惑:

1) 什么决定了任务的内容?

2) 什么决定了要执行的任务数量?

Spark 的引擎将连续 rdd 上的简单操作“粘合”在一起,例如:

rdd1 = sc.textFile( ... )
rdd2 = rdd1.filter( ... )
rdd3 = rdd2.map( ... )
rdd3RowCount = rdd3.count

所以当 rdd3 被(懒惰地)计算时,spark 将为 rdd1 的每个分区生成一个任务,并且每个任务将执行过滤器和每行的映射以产生 rdd3。

任务数由分区数决定。每个 RDD 都有定义数量的分区。对于从 HDFS 读取的源 RDD(例如使用 sc.textFile(...)),分区数是输入格式生成的拆分数。 RDD(s) 上的某些操作可能会导致 RDD 具有不同数量的分区:

rdd2 = rdd1.repartition( 1000 ) will result in rdd2 having 1000 partitions ( regardless of how many partitions rdd1 had ).

另一个例子是连接:

rdd3 = rdd1.join( rdd2  , numPartitions = 1000 ) will result in rdd3 having 1000 partitions ( regardless of partitions number of rdd1 and rdd2 ).

(大多数)改变分区数量的操作都涉及到 shuffle,例如:

rdd2 = rdd1.repartition( 1000 ) 

实际发生的是 rdd1 的每个分区上的任务需要产生一个可以被下一个阶段读取的最终输出,以便使 rdd2 恰好有 1000 个分区(他们是如何做到的?Hash 或 Sort )。这一侧的任务有时被称为“地图(侧)任务”。 稍后将在 rdd2 上运行的任务将作用于一个分区(rdd2!),并且必须弄清楚如何读取/组合与该分区相关的映射端输出。这一侧的任务有时被称为“减少(侧)任务”。

这2个问题是相关的:一个stage中的task数量是partition的数量(对于连续的rdds“粘合”在一起是常见的)和一个rdd的partition数量可以在stage之间改变(通过指定的数量分区到一些 shuffle 导致操作,例如)。

一旦一个阶段开始执行,它的任务就可以占用任务槽。并发任务槽的数量是 numExecutors * ExecutorCores。一般来说,这些可以被来自不同的、非依赖阶段的任务占用。

【讨论】:

以上是关于Spark 中的阶段是如何划分为任务的?的主要内容,如果未能解决你的问题,请参考以下文章

spark中job,stage,task的关系

写入 HDFS 时 Apache spark 中的任务数

Spark的任务调度

第三十四课 Spark中任务处理的Stage划分和Task最佳位置算法

Spark的Job的划分

Spark的Job的划分