如何在不收集的情况下将 RDD、Dataframe 或 Dataset 直接转换为广播变量?

Posted

技术标签:

【中文标题】如何在不收集的情况下将 RDD、Dataframe 或 Dataset 直接转换为广播变量?【英文标题】:How to transform RDD, Dataframe or Dataset straight to a Broadcast variable without collect? 【发布时间】:2016-07-12 13:04:04 【问题描述】:

是否有任何方法(或任何计划)能够将 Spark 分布式集合(RDDs、DataframeDatasets)直接转换为 Broadcast 变量而无需 collect?公共 API 似乎没有任何“开箱即用”的功能,但可以在较低级别完成一些事情吗?

我可以想象这类操作有 2 倍加速潜力(或更多?)。为了详细解释我的意思,让我们举个例子:

val myUberMap: Broadcast[Map[String, String]] =
  sc.broadcast(myStringPairRdd.collect().toMap)

someOtherRdd.map(someCodeUsingTheUberMap)

这会将所有数据收集到驱动程序,然后广播数据。这意味着数据基本上通过网络发送了两次。

最好是这样的:

val myUberMap: Broadcast[Map[String, String]] =
  myStringPairRdd.toBroadcast((a: Array[(String, String)]) => a.toMap)

someOtherRdd.map(someCodeUsingTheUberMap)

在这里,Spark 可以完全绕过收集数据,而只是在节点之间移动数据。

奖金

此外,对于.toMapArray[T] 上的任何操作很昂贵但可以并行完成的情况,可能会有一个类似 Monoid 的 API(有点像 combineByKey)。例如。构建某些 Trie 结构可能很昂贵,这种功能可能会为算法设计带来巨大的空间。此 CPU 活动也可以在 IO 运行时运行 - 当前广播机制处于阻塞状态(即所有 IO,然后是所有 CPU,然后再次是所有 IO)。

澄清

在这里加入不是(主要)用例,可以假设我稀疏地使用广播数据结构。例如,someOtherRdd 中的键绝不会覆盖myUberMap 中的键,但在遍历someOtherRdd 并假设我多次使用myUberMap 之前我不知道需要哪些键。

我知道这一切听起来有点模糊,但重点是更通用的机器学习算法设计。

【问题讨论】:

【参考方案1】:

虽然理论上这是一个有趣的想法,但我认为尽管理论上可行,但它的实际应用非常有限。显然我不能代表 PMC,所以我不能说是否有任何计划实施这种广播机制。

可能的实施

由于 Spark 已经提供了torrent broadcasting 机制,其行为描述如下:

驱动程序将序列化的对象分成小块并 将这些块存储在驱动程序的BlockManager 中。

在每个执行器上,执行器首先尝试从其BlockManager 获取对象。 如果不存在,则使用远程获取从驱动程序和/或 其他执行人(如果有)。

一旦获得块,它就会将块放入自己的 BlockManager,准备好让其他执行者从中获取。

应该可以重用相同的机制进行直接的节点到节点广播。

值得注意的是,这种方法并不能完全消除驱动程序通信。即使可以在本地创建块,您仍然需要单一的事实来源来宣传一组要获取的块。

有限的应用程序

广播变量的一个问题是非常昂贵。即使您可以消除驱动程序瓶颈,仍然存在两个问题:

在每个执行程序上存储反序列化对象所需的内存。 将广播数据传输给每个执行者的成本。

第一个问题应该比较明显。它不仅与直接内存使用有关,还与 GC 成本及其对整体延迟的影响有关。第二个比较微妙。我在对Why my BroadcastHashJoin is slower than ShuffledHashJoin in Spark 的回答中部分介绍了这一点,但让我们进一步讨论。

从网络流量的角度来看,广播整个数据集几乎等同于创建笛卡尔积。因此,如果数据集大到足以让驱动程序成为瓶颈,则它不太可能成为广播的良好候选者,并且在实践中可能会首选像哈希连接这样的目标方法。

替代方案

有一些方法可用于实现与直接广播类似的结果并解决上面列举的问题,包括:

通过分布式文件系统传递数据。 使用与工作节点并置的复制数据库。

【讨论】:

我赞成指出确实对于常规连接,随机连接通常更好。但不接受,因为我的用例比这更普遍——我没有说我只想做常规的单连接/笛卡尔。我想因为这是 99% 的人对广播所做的,所以这是一个合理的假设。我已经更新了我的 OP 以使其更加清晰。谢谢。 哦,我明白了。问题是,只要我们不使用堆外结构,GC 就会比网络流量更快地吞噬我们。或者至少这是我到目前为止所看到的。如果我们开始针对非常大的对象进行调整,那么我们的性能就会受到其他方面的影响。所以我能想到的唯一应用是小对象和接近实时处理的调整。但不是流式传输,因为我们无法优雅地销毁和重播。 我确实认为能够直接从执行器实例化广播数据会很有用。这将提供灵活性。例如,如果广播变量是查询非常慢的外部 jdbc 数据源的结果。可以使用 spark 集群的并行加载能力在源服务器上实例化多个查询,这些查询可以在并行线程中运行。 @zero323 很棒的答案,仍然对通过驱动程序的优势感到好奇。因此,如果您绕过驱动程序,节省的费用可能会受到限制,但是通过驱动程序有什么好处吗?【参考方案2】:

我不知道我们是否可以为 RDD 做,但你可以为 Dataframe 做

import org.apache.spark.sql.functions

val df:DataFrame = your_data_frame

val broadcasted_df = functions.broadcast(df)

现在您可以使用变量broadcasted_df,它将被广播到执行器。

确保 broadcasted_df 数据帧不会太大并且可以发送到执行器。

broadcasted_df 将成为广播公司,例如

other_df.join(broadcasted_df)

在这种情况下,join() 操作执行得更快,因为每个执行器都有 1 个 other_df 分区和整个 broadcasted_df

对于您的问题,我不确定您是否可以做您想做的事。您不能在另一个 rdd 的 #map() 方法中使用一个 rdd,因为 spark 不允许在转换中进行转换。在您的情况下,您需要调用 collect() 方法从您的 RDD 创建地图,因为您只能在 #map() 方法中使用常用的地图对象,您不能在那里使用 RDD。

【讨论】:

我不认为这实际上会导致数据帧在没有“幕后”收集的情况下分发(尽管这是一种预感)。相反,如果我们将它用于连接,它会将数据帧标记为广播。值得注意的是,加入在这里不是(主要)用例,我将在我的 OP 中通过编辑说明这一点。 不是预感:它收集DataFrame 而不转换为本地类型并将其广播回来。我很确定这里有一个描述这个问题的答案,但它可能在聊天中。它只是以一种或另一种方式隐藏collect

以上是关于如何在不收集的情况下将 RDD、Dataframe 或 Dataset 直接转换为广播变量?的主要内容,如果未能解决你的问题,请参考以下文章

如何在不产生 .rdd 成本的情况下检查 Spark DataFrame 的分区数

如何在不从 DataFrame 转换并访问它的情况下将列添加到 Dataset?

如何在本地映射 RDD?

如何在不使用collect()和for循环的情况下将一个(IP地址)的特定部分与RDD python pyspark中另一列中的其他IP地址进行比较

是否可以在不先列出列表的情况下将 Series 附加到 DataFrame 行?

如何在不将 csv 保存到磁盘的情况下将 csv 格式的数据从内存发送到数据库?