Spark RDD 分区之间的同步
Posted
技术标签:
【中文标题】Spark RDD 分区之间的同步【英文标题】:Synchronization between Spark RDD partitions 【发布时间】:2015-12-04 04:33:41 【问题描述】:假设我有一个有 3 个分区的 RDD,我想按顺序运行每个 executor/worker,这样,在计算完分区 1 之后,就可以计算分区 2,在计算完 2 之后,最后,可以计算分区 3。我需要这种同步的原因是因为每个分区都依赖于前一个分区的某些计算。如果我错了,请纠正我,但这种类型的同步似乎不太适合 Spark 框架。
我曾考虑在每个工作任务节点中打开一个 JDBC 连接,如下图所示:
rdd.foreachPartition( partition =>
// 1. open jdbc connection
// 2. poll database for the completion of dependent partition
// 3. read dependent edge case value from computed dependent partition
// 4. compute this partition
// 5. write this edge case result to database
// 6. close connection
)
我什至考虑过使用累加器,在驱动程序中提取 acc 值,然后重新广播一个值,以便适当的工作人员可以开始计算,但显然广播不能像这样工作,即,一旦你已经发货通过foreachPartition广播变量,不能重新广播不同的值。
【问题讨论】:
Spark 用于分布式计算和并行处理。如果您需要按顺序处理数据,则不需要 Spark。您可以在 java 或 Scala 中编写您的作业,从命令行执行它,或者可以将其安排为使用任何标准调度程序(cron、Quartz 等)。 之所以要使用Spark,是因为我需要将大量数据放入内存中进行快速计算。我还想利用 Spark 的其他属性,例如数据和任务分布以及数据弹性。我还需要实现一定程度的并行性,但在这个看似简单的顺序步骤得到解决之前,我不想将其带入问题空间。 想想两个大数组的内存大约 20 GB。我只需要遍历一个数组,同时比较另一个数组的值。如果有足够的内存,我可以在一台机器上执行此操作,但可以说我有一个 Spark 已经启动并运行的机器集群。给定 RDD 抽象,在 Spark 集群中分配 20 GB 数组非常简单。将代码推送到每个工作进程以遍历每个数组分区同样简单。现在,仅仅因为我需要顺序执行并不意味着 Spark 没有用。 您可能无法在 Spark 中按顺序处理分区。尽管 RDD API 中有一个fold()
函数,但它也适用于每个分区的分布式模型。您需要遵循@zero323 的建议,但您可以使用一些内存中的分布式缓存解决方案(如 couchbase)来存储中间结果。
【参考方案1】:
同步并不是真正的问题。问题是你想使用并发层来实现这一点,结果你得到了完全的顺序执行。更不用说,通过将更改推送到数据库只是为了将这些更改取回另一个工作人员意味着您不会获得内存处理的好处。在目前的形式中,使用 Spark 根本没有意义。
一般来说,如果你想在 Spark 中实现同步,你应该从转换的角度来考虑。你的问题很粗略,但你可以尝试这样的事情:
-
使用来自第一个分区的数据创建第一个 RDD。并行处理并可选择将结果推送到外部
计算差分缓冲区
使用来自第二个分区的数据创建第二个 RDD。与 2 中的差分缓冲区合并,处理,可选择将结果推送到数据库。
回到 2. 并重复
你在这里得到了什么?首先,您可以利用整个集群。此外,部分结果保存在内存中,不必在工作人员和数据库之间来回传输。
【讨论】:
以上是关于Spark RDD 分区之间的同步的主要内容,如果未能解决你的问题,请参考以下文章