Spark RDD 沿袭和存储
Posted
技术标签:
【中文标题】Spark RDD 沿袭和存储【英文标题】:Spark RDD Lineage and Storage 【发布时间】:2016-03-30 05:20:02 【问题描述】:inputRDD = sc.textFile("log.txt")
errorsRDD = inputRDD.filter(lambda x: "error" in x)
warningsRDD = inputRDD.filter(lambda x: "warning" in x)
badLinesRDD = errorsRDD.union(warningsRDD)
badLinesCount = badLinesRDD.count()
warningCount = warningsRDD.count()
在上面的代码中,直到执行代码的倒数第二行,计算 badLinesRDD 中的对象数量,才会评估任何转换。所以当这个badLinesRDD.count()
运行时,它将计算前四个RDD,直到联合并返回结果。但是当 warningsRDD.count()
运行时,它只会计算转换 RDD 直到前 3 行并返回正确的结果?
此外,当这些 RDD 转换被计算时,当对它们调用操作时,最后一个 RDD 转换(即联合)中的对象存储在哪里?它是否存储在并行运行过滤器转换的每个 DataNode 的内存中?
【问题讨论】:
【参考方案1】:除非任务输出被显式(例如cache
、persist
)或隐式(随机写入)持久化,并且每个操作都有足够的可用空间来执行完整的沿袭。
因此,当您调用 warningsRDD.count()
时,它将加载文件 (sc.textFile("log.txt")
) 并过滤 (inputRDD.filter(lambda x: "warning" in x)
)。
此外,当这些 RDD 转换被计算时,当对它们调用操作时,最后一个 RDD 转换(即联合)中的对象存储在哪里?
假设数据没有持久化,无处可去。在数据传递到下一个阶段或输出后,所有任务输出都将被丢弃。如果数据被持久化,则取决于设置(磁盘、堆上、堆外、DFS)。
【讨论】:
感谢您的澄清!我有一个问题:你说"All task outputs are discarded after data is passed to the next stage or output"
。所以基本上这意味着当您第一次执行操作时,将运行前四个 RDD 转换,但来自这些 RDD 转换的所有数据都将消失,但是当您调用 val warningCount = warningsRDD.count()
时(例如,在 badLinesRDD.count()
行之后)它将查看val warningCount
的依赖关系,即warningsRDD
,然后查看val warningsRDD
的依赖关系,即输入RDD
并且只重新计算这两个转换 RDD,因为形成的沿袭图正确吗?这就是它不重新计算 errorsRDD = inputRDD.filter(lambda x: "error" in x)
的原因,因为在评估 val warningCount = warningsRDD.count()
行时不需要它。
errorsRDD
不是 warningsRDD
的依赖项,因此它不是必需的。如果您有疑问,您可以随时在 UI 中查看 DAG。参见例如***.com/q/34580662/1560062以上是关于Spark RDD 沿袭和存储的主要内容,如果未能解决你的问题,请参考以下文章