为啥 Spark 将 Map 阶段输出保存到本地磁盘?

Posted

技术标签:

【中文标题】为啥 Spark 将 Map 阶段输出保存到本地磁盘?【英文标题】:Why does Spark save Map phase output to local disk?为什么 Spark 将 Map 阶段输出保存到本地磁盘? 【发布时间】:2016-02-18 11:14:34 【问题描述】:

我正在尝试深入了解 spark shuffle 过程。当我开始阅读时,我遇到了以下几点。

Spark 在完成时将 Map 任务 (ShuffleMapTask) 输出直接写入磁盘。

我想了解以下关于 Hadoop MapReduce 的内容。

    如果 Map-Reduce 和 Spark 都将数据写入本地磁盘,那么 spark shuffle 过程与 Hadoop MapReduce 有何不同?

    既然数据在 Spark 中表示为 RDD,为什么这些输出不保留在节点执行器内存中?

    Hadoop MapReduce 和 Spark 的 Map 任务输出有何不同?

    如果有很多小的中间文件作为输出,spark 如何处理网络和 I/O 瓶颈?

【问题讨论】:

【参考方案1】:

首先,Spark 不能以严格的 map-reduce 方式工作,除非必要,否则map 的输出不会写入磁盘。写入磁盘的随机文件。

这并不意味着洗牌后的数据不保存在内存中。 Spark 中的 Shuffle 文件主要由 to avoid re-computation in case of multiple downstream actions 编写。为什么要写入文件系统?至少有两个交错的原因:

内存是一种宝贵的资源,Spark 中的内存缓存是短暂的。可以在需要时从缓存中清除旧数据。 shuffle 是一个昂贵的过程,如果没有必要,我们希望避免。以一种使其在给定上下文的生命周期内保持不变的方式存储随机数据更有意义。

除了正在进行的低级优化工作和实施细节之外,Shuffle 本身并没有什么不同。它基于相同的基本方法,但具有所有局限性。

任务与 Hadoo 地图有何不同?正如Justin Pihony 很好地说明的那样,多个不需要洗牌的转换在单个任务中被压缩在一起。由于这些操作在标准 Scala 迭代器上操作,因此可以通过管道对单个元素进行操作。

关于网络和 I/O 瓶颈,这里没有灵丹妙药。虽然 Spark 可以通过组合转换、在内存中缓存和提供转换感知工作器偏好来减少写入磁盘或混洗的数据量,但它与任何其他分布式框架一样受到相同的限制。

【讨论】:

【参考方案2】:

如果 Map-Reduce 和 Spark 都将数据写入本地磁盘,那么 spark shuffle 过程与 Hadoop MapReduce 有何不同?

当您执行 Spark 应用程序时,首先要启动 SparkContext,它会成为多个互连服务的所在地,其中最重要的是 DAGSchedulerTaskSchedulerSchedulerBackend

DAGScheduler 是主要的编排器,负责将 RDD 沿袭图(即 RDD 的有向无环图)转换为阶段。在执行此操作时,DAGScheduler 会遍历最终 RDD 的父依赖项,并创建一个带有父 ShuffleMapStagesResultStage

ResultStage 是(大部分)最后一个阶段,ShuffleMapStages 是它的父母。我说大部分是因为我想我可能已经看到您可以“安排”ShuffleMapStage

这是 Spark 应用于您的 Spark 作业(共同创建一个 Spark 应用程序)的最早期和第一个优化 - 执行管道,其中多个转换连接在一起以创建一个阶段(因为它们之间的-依赖性很窄)。这就是使 Spark 比 Hadoop MapReduce 更快的原因,因为两个或多个转换可以一个接一个地执行,而无需在内存中进行数据混洗。

单个阶段与ShuffleDependency 一样宽(又名宽依赖)。

有一些 RDD 转换会导致洗牌(由于创建 ShuffleDependency)。这就是 Spark 与 Hadoop 的 MapReduce 非常相似的时刻,因为它将部分 shuffle 输出保存到...执行器上的本地磁盘。

当 Spark 应用程序启动时,它会向集群管理器请求执行程序(支持三种:Spark Standalone、Apache Mesos 和 Hadoop YARN)。这就是 SchedulerBackend 的用途——管理 Spark 应用程序和集群资源之间的通信。

(假设您没有使用 External Shuffle Manager)

Executors 托管自己的本地BlockManagers,负责管理保存在本地硬盘驱动器上的 RDD 块(可能在内存中并复制)。您可以使用cachepersist 运算符和StorageLevels 来控制RDD 块持久性。您可以在 Web UI 中使用 StorageExecutors 选项卡来跟踪块及其位置和大小。

Spark 在本地(在执行器上)存储数据与 Hadoop MapReduce 的区别在于:

    部分结果(在计算 ShuffleMapStages 之后)保存在本地硬盘上,而不是 HDFS 上,HDFS 是一种分布式文件系统,保存成本非常高。

    只有一些文件被保存到本地硬盘驱动器(在操作被流水线化之后),这在将所有地图保存到 HDFS 的 Hadoop MapReduce 中不会发生。

让我回答以下问题:

如果有很多小的中间文件作为输出,spark 如何处理网络和 I/O 瓶颈?

这是 Spark 执行计划中最棘手的部分,很大程度上取决于改组的范围。如果您只使用本地数据(一台机器上的多个执行程序),您将看不到数据流量,因为数据已经到位。

如果需要数据shuffle,executor之间会互相发送数据,会增加流量。

Spark 应用中节点间的数据交换

只是为了详细说明 Spark 应用程序中节点之间的流量。

Broadcast variables 是从驱动程序向执行程序发送数据的方式。

Accumulators 是从执行器向驱动程序发送数据的方式。

像collect 这样的操作符会将所有远程块从执行器拉到驱动程序。

【讨论】:

如果一个核心已经执行了5个任务,那么这5个任务都会有合并的文件被推送到下一个阶段。因此在网络传输数据时读取的文件数会更少。

以上是关于为啥 Spark 将 Map 阶段输出保存到本地磁盘?的主要内容,如果未能解决你的问题,请参考以下文章

Spark的两种核心Shuffle详解

Spark的两种核心Shuffle详解

面试常问-Spark的两种核心Shuffle详解

面试常问-Spark的两种核心Shuffle详解

Spark Shuffle服务和客户端

Spark-Application 到本地目录