Spark RDD 沿袭和存储

Posted

技术标签:

【中文标题】Spark RDD 沿袭和存储【英文标题】:Spark RDD Lineage and Storage 【发布时间】:2016-03-30 05:20:02 【问题描述】:
inputRDD = sc.textFile("log.txt")
errorsRDD = inputRDD.filter(lambda x: "error" in x) 
warningsRDD = inputRDD.filter(lambda x: "warning" in x) 
badLinesRDD = errorsRDD.union(warningsRDD)
badLinesCount = badLinesRDD.count()
warningCount = warningsRDD.count()

在上面的代码中,直到执行代码的倒数第二行,计算 badLinesRDD 中的对象数量,才会评估任何转换。所以当这个badLinesRDD.count() 运行时,它将计算前四个RDD,直到联合并返回结果。但是当 warningsRDD.count() 运行时,它只会计算转换 RDD 直到前 3 行并返回正确的结果?

此外,当这些 RDD 转换被计算时,当对它们调用操作时,最后一个 RDD 转换(即联合)中的对象存储在哪里?它是否存储在并行运行过滤器转换的每个 DataNode 的内存中?

【问题讨论】:

【参考方案1】:

除非任务输出被显式(例如cachepersist)或隐式(随机写入)持久化,并且每个操作都有足够的可用空间来执行完整的沿袭。

因此,当您调用 warningsRDD.count() 时,它将加载文件 (sc.textFile("log.txt")) 并过滤 (inputRDD.filter(lambda x: "warning" in x))。

此外,当这些 RDD 转换被计算时,当对它们调用操作时,最后一个 RDD 转换(即联合)中的对象存储在哪里?

假设数据没有持久化,无处可去。在数据传递到下一个阶段或输出后,所有任务输出都将被丢弃。如果数据被持久化,则取决于设置(磁盘、堆上、堆外、DFS)。

【讨论】:

感谢您的澄清!我有一个问题:你说"All task outputs are discarded after data is passed to the next stage or output"。所以基本上这意味着当您第一次执行操作时,将运行前四个 RDD 转换,但来自这些 RDD 转换的所有数据都将消失,但是当您调用 val warningCount = warningsRDD.count() 时(例如,在 badLinesRDD.count() 行之后)它将查看val warningCount 的依赖关系,即warningsRDD,然后查看val warningsRDD 的依赖关系,即输入RDD 并且只重新计算这两个转换 RDD,因为形成的沿袭图正确吗?这就是它不重新计算 errorsRDD = inputRDD.filter(lambda x: "error" in x) 的原因,因为在评估 val warningCount = warningsRDD.count() 行时不需要它。 errorsRDD 不是 warningsRDD 的依赖项,因此它不是必需的。如果您有疑问,您可以随时在 UI 中查看 DAG。参见例如***.com/q/34580662/1560062

以上是关于Spark RDD 沿袭和存储的主要内容,如果未能解决你的问题,请参考以下文章

RDD沿袭/ Spark操作员图的良好输出

spark系列之基本概念

Spark中的“RDD可以存储在内存中”是啥意思?

CSV 到 RDD 到 Apache Spark 中的 Cassandra 存储

Spark RDD Transformation 简单用例

Spark之RDD弹性特性