如何在 Apache Spark 中进行增量 MapReduce

Posted

技术标签:

【中文标题】如何在 Apache Spark 中进行增量 MapReduce【英文标题】:How to do Incremental MapReduce in Apache Spark 【发布时间】:2017-11-22 01:51:01 【问题描述】:

在 CouchDB 和 Incoop 等系统设计中,有一个称为“增量 MapReduce”的概念,其中保存了以前执行 MapReduce 算法的结果,并用于跳过未更改的输入数据部分。

假设我有 100 万行分为 20 个分区。如果我对这些数据运行一个简单的 MapReduce,我可以缓存/存储减少每个单独分区的结果,然后再将它们组合并再次减少以产生最终结果。如果我只更改第 19 个分区中的数据,那么我只需要对数据的更改部分运行 map & reduce 步骤,然后将新结果与未更改分区中保存的 reduce 结果相结合,以获得更新的结果。使用这种捕获方式,我可以跳过几乎 95% 的工作,以便在这个假设的数据集上重新运行 MapReduce 作业。

有没有什么好的方法可以将此模式应用于 Spark?我知道我可以编写自己的工具来将输入数据拆分为分区,检查我之前是否已经处理过这些分区,如果有的话从缓存中加载它们,然后运行最终的 reduce 以将所有分区连接在一起。但是,我怀疑有一种更简单的方法可以解决这个问题。

我已经在 Spark Streaming 中尝试过检查点,它能够在重新启动之间存储结果,这几乎是我想要的,但我想在流作业之外执行此操作。

RDD 缓存/持久化/检查点几乎看起来像是我可以构建的东西——它可以很容易地保留中间计算并在以后引用它们,但我认为一旦 SparkContext 停止,缓存的 RDD 总是会被删除,即使它们'被持久化到磁盘。因此缓存不适用于在重新启动之间存储结果。另外,我不确定在启动新 SparkContext 时是否应该/如何加载检查点 RDD……它们似乎存储在特定于 SparkContext 单个实例的 UUID in the checkpoint directory 下。

【问题讨论】:

【参考方案1】:

article 建议的两个用例(增量日志处理和增量查询处理)一般都可以通过 Spark Streaming 解决。

我们的想法是使用 DStreams 抽象来进行增量更新。然后,您可以处理新数据,并使用基于时间窗口的处理或使用任意有状态操作作为Structured Stream Processing 的一部分将其与先前的计算相结合。计算结果可以稍后转储到某种外部接收器,如数据库或文件系统,或者它们可以作为 SQL 表公开。

如果您不构建在线数据处理系统,也可以使用常规 Spark。这只是增量更新如何进入流程以及如何保存中间状态的问题。例如,增量更新可以出现在分布式文件系统的某个路径下,而包含先前计算和新数据计算的中间状态可以再次转储到同一个文件系统。

【讨论】:

我可以对我只附加到的数据集使用流式传输,但想象一下我在数据集中的随机点更新单行。我要么需要清除我的旧状态并重新处理所有内容,要么想出一种聪明的方法,在添加更新的新行之前删除旧行添加到状态中的任何内容。对于像计数器这样简单的事情没问题,但对于更复杂的操作,如果不重新处理就可能无法解释更改。 除非您的意思是每个时间窗口都有自己单独存储的结果,这样我可以重做单个时间窗口的处理,然后更新将所有这些时间窗口连接在一起的结果。这将接近我正在寻找的。​​span> @slang mapGroupsWithState 函数允许任意状态更新,请查看this 文章中的示例。 好的,但我的数据并不是真正的“事件”。它们更像是文档,可以更新或删除。如果我只使用状态更新,我需要确保在删除文档时,我用来聚合数据的每个操作也可以反转,这是一个主要限制。我想做的只是删除由更新/删除的文档产生的状态的某个部分,然后重新处理该部分中的所有文档。有什么好的方法吗? 我建议重新考虑您的数据建模 - 通常在分布式系统中,对命令事件进行建模并在文档视图(“CQRS”)上构建单独的查询处理程序很有用。

以上是关于如何在 Apache Spark 中进行增量 MapReduce的主要内容,如果未能解决你的问题,请参考以下文章

Apache Spark - 根据列值添加增量 ID

spark streaming 一个批次取多少数据

Apache Spark 中的分层数据操作

如何在 Apache Spark 上对整数列表进行排序?

Spark增量加载覆盖旧记录

Apache Spark基础知识