(为啥)我们需要在 RDD 上调用缓存还是持久化

Posted

技术标签:

【中文标题】(为啥)我们需要在 RDD 上调用缓存还是持久化【英文标题】:(Why) do we need to call cache or persist on a RDD(为什么)我们需要在 RDD 上调用缓存还是持久化 【发布时间】:2015-05-12 22:22:19 【问题描述】:

当从文本文件或集合(或从另一个 RDD)创建弹性分布式数据集 (RDD) 时,我们是否需要显式调用“缓存”或“持久化”来将 RDD 数据存储到内存中?还是RDD数据默认分布式存储在内存中?

val textFile = sc.textFile("/user/emp.txt")

据我了解,经过上述步骤,textFile 是一个 RDD,并且在所有/部分节点的内存中都可用。

如果是这样,那为什么我们需要在 textFile RDD 上调用“缓存”或“持久化”呢?

【问题讨论】:

【参考方案1】:

以下是您应该缓存 RDD 的三种情况:

多次使用 RDD

对同一个 RDD 执行多个操作

对于长链(或非常昂贵的)转换

【讨论】:

【参考方案2】:

添加添加(或临时添加)cache 方法调用的另一个原因。

用于调试内存问题

使用cache 方法,spark 将提供有关 RDD 大小的调试信息。因此在 spark 集成 UI 中,您将获得 RDD 内存消耗信息。事实证明,这对诊断内存问题非常有帮助。

【讨论】:

【参考方案3】:

我认为这个问题最好表述为:

我们什么时候需要调用缓存或持久化 RDD?

Spark 进程是惰性的,也就是说,除非需要,否则不会发生任何事情。 为了快速回答这个问题,在发出val textFile = sc.textFile("/user/emp.txt") 之后,数据没有任何变化,只构造了一个HadoopRDD,使用文件作为源。

假设我们对数据进行了一些转换:

val wordsRDD = textFile.flatMap(line => line.split("\\W"))

同样,数据没有任何反应。现在有一个新的 RDD wordsRDD 包含对 testFile 的引用和一个在需要时应用的函数。

只有当一个动作被一个RDD调用时,比如wordsRDD.count,RDD链,称为lineage才会被执行。也就是说,数据被划分为分区,将由 Spark 集群的 executors 加载,应用 flatMap 函数并计算结果。

在线性谱系上,例如本例中的谱系,不需要cache()。数据将被加载到执行程序,所有转换都将被应用,最后count 将被计算,全部在内存中 - 如果数据适合内存。

cache 在 RDD 的血统分支出来时很有用。假设您要将上一个示例中的单词过滤为正负单词的计数。你可以这样做:

val positiveWordsCount = wordsRDD.filter(word => isPositive(word)).count()
val negativeWordsCount = wordsRDD.filter(word => isNegative(word)).count()

在这里,每个分支都会重新加载数据。添加显式的cache 语句将确保之前完成的处理被保留和重用。作业将如下所示:

val textFile = sc.textFile("/user/emp.txt")
val wordsRDD = textFile.flatMap(line => line.split("\\W"))
wordsRDD.cache()
val positiveWordsCount = wordsRDD.filter(word => isPositive(word)).count()
val negativeWordsCount = wordsRDD.filter(word => isNegative(word)).count()

因此,cache 被称为“打破血统”,因为它创建了一个检查点,可重复用于进一步处理。

经验法则:当您的 RDD 的血统分支出或当一个 RDD 被多次使用时(例如在循环中),请使用 cache

【讨论】:

太棒了。谢谢。还有一个相关的问题。当我们缓存或持久化时,数据将存储在执行器的内存或工作节点的内存中。如果是executor的内存,Spark如何识别哪个executor有数据。 @RamanaUppala 执行器内存已使用。用于缓存的执行程序内存部分由配置spark.storage.memoryFraction 控制。关于哪个 executor 有哪些数据,RDD 将跟踪其分布在 executor 上的分区。 @maasg 如果我错了,请纠正我,但 cachepersist 都不能打破血统 如果上面例子中没有.cache()语句,wordsRDD会存储在哪里? 如果在两个计数之前,我们将两个分支合并回一个 rdd 并计数呢?在这种情况下,缓存有用吗?【参考方案4】:

我们是否需要显式调用“缓存”或“持久化”来将 RDD 数据存储到内存中?

是的,仅在需要时。

内存中默认分布式存储的RDD数据?

不!

这就是为什么:

Spark 支持两种类型的共享变量:广播变量,可用于在所有节点的内存中缓存值,以及累加器,它们是仅“添加”到的变量,例如计数器和总和.

RDD 支持两种类型的操作:转换(从现有数据集创建新数据集)和操作(在对数据集运行计算后将值返回给驱动程序)。例如,map 是一种转换,它通过一个函数传递每个数据集元素并返回一个表示结果的新 RDD。另一方面,reduce 是一个 action,它使用某个函数聚合 RDD 的所有元素,并将最终结果返回给驱动程序(尽管也有一个并行的 reduceByKey,它返回一个分布式数据集)。

李>

Spark 中的所有转换都是惰性的,因为它们不会立即计算结果。相反,他们只记得应用于某些基础数据集(例如文件)的转换。仅当操作需要将结果返回给驱动程序时才计算转换。这种设计使 Spark 能够更高效地运行——例如,我们可以意识到通过 map 创建的数据集将在 reduce 中使用,并且仅将 reduce 的结果返回给驱动程序,而不是更大的映射数据集。

默认情况下,每个转换后的 RDD 可能会在您每次对其运行操作时重新计算。 但是,您也可以使用持久(或缓存)方法将 RDD 持久化到内存中,在这种情况下,Spark 会将元素保留在集群中,以便在您下次查询时更快地访问它。 em> 还支持在磁盘上持久化 RDD,或跨多个节点复制。

更多详情请查看Spark programming guide。

【讨论】:

那没有回答我的问题。 什么不回答? 当RDD的数据默认存储在内存中时,为什么需要调用Cache或Persist? RDD 默认不存储在内存中,因此持久化 RDD 可以让 Spark 在集群上更快地执行转换 这是一个很好的答案,我不知道为什么它被否决了。这是一个自上而下的答案,从高级概念解释 RDD 如何工作。我添加了另一个自下而上的答案:从“这条线做什么”开始。对于刚开始使用 Spark 的人来说,可能更容易理解。【参考方案5】:

大多数 RDD 操作都是惰性的。将 RDD 视为对一系列操作的描述。 RDD 不是数据。所以这一行:

val textFile = sc.textFile("/user/emp.txt")

它什么都不做。它创建一个 RDD,上面写着“我们需要加载这个文件”。此时文件未加载。

需要观察数据内容的RDD操作不能偷懒。 (这些被称为actions。)一个例子是RDD.count——告诉你文件中的行数,文件需要被读取。所以如果你写textFile.count,此时会读取文件,计算行数,返回计数。

如果您再次拨打textFile.count 会怎样?同样的事情:文件将被读取并再次计数。没有存储任何内容。 RDD 不是数据。

那么RDD.cache 做了什么?如果在上面的代码中添加textFile.cache

val textFile = sc.textFile("/user/emp.txt")
textFile.cache

它什么都不做。 RDD.cache 也是偷懒操作。该文件仍未被读取。但是现在 RDD 说“读取这个文件然后缓存内容”。如果您随后第一次运行textFile.count,该文件将被加载、缓存和计数。如果您第二次调用textFile.count,该操作将使用缓存。它只会从缓存中获取数据并计算行数。

缓存行为取决于可用内存。例如,如果文件不适合内存,那么textFile.count 将退回到通常的行为并重新读取文件。

【讨论】:

嗨 daniel, - 当您调用缓存时,这是否意味着不会从源(例如文本文件)重新加载 RDD - 您如何确定文本文件中的数据是最新的它的缓存? (spark 是否可以解决这个问题,或者是否定期手动操作 unpersist() 以确保源数据在沿袭后期重新计算?) 另外 - 如果你必须定期取消持久化, - 如果你有一个缓存的 rdd,依赖于另一个缓存的 RDD,你必须取消两个 RDD 才能看到重新计算的结果吗? Spark 只是假设文件永远不会改变。它在任意时间点读取文件,并可能在以后根据需要重新读取文件的一部分。 (例如,如果从缓存中推出了一条数据。)所以你最好保持你的文件不变!当您有新数据时,只需使用新名称创建一个新文件,然后将其作为新 RDD 加载。如果您不断获取新数据,请查看 Spark Streaming。 是的。 RDD 是不可变的,因此每个 RDD 都假定其依赖项也是不可变的。 Spark Streaming 允许您设置这样的树来操作更改流。但更简单的解决方案是在一个函数中构建树,该函数将文件名作为参数。然后只需调用新文件的函数并噗,你就得到了新的计算树。 @Humoyun:在Spark UI的Storage选项卡上可以看到每个RDD缓存了多少。数据可能如此之大,以至于只有 40% 的数据适合您用于缓存的总内存。在这种情况下,一种选择是使用perisist 并选择一个允许将缓存数据溢出到磁盘的存储选项。

以上是关于(为啥)我们需要在 RDD 上调用缓存还是持久化的主要内容,如果未能解决你的问题,请参考以下文章

RDD 沿袭缓存

3天掌握Spark-- RDD持久化

RDD缓存

RDD的缓存,依赖,spark提交任务流程

揭秘Spark应用性能调优

07RDD持久化