如何在 PySpark 中广播 RDD?

Posted

技术标签:

【中文标题】如何在 PySpark 中广播 RDD?【英文标题】:How to broadcast RDD in PySpark? 【发布时间】:2017-05-27 12:05:48 【问题描述】:

是否可以在 Python 中广播 RDD?

我正在关注“使用 Spark 进行高级分析:从大规模数据中学习的模式”一书,在第 3 章中需要广播 RDD。我正在尝试使用 Python 而不是 Scala 来遵循示例。

无论如何,即使是这个简单的例子,我也有一个错误:

my_list = ["a", "d", "c", "b"]
my_list_rdd = sc.parallelize(my_list)
sc.broadcast(my_list_rdd)

错误是:

"It appears that you are attempting to broadcast an RDD or reference an RDD from an "
Exception: It appears that you are attempting to broadcast an RDD or reference an RDD from an
action or transformation. RDD transformations and actions can only be invoked by the driver, n
ot inside of other transformations; for example, rdd1.map(lambda x: rdd2.values.count() * x) i
s invalid because the values transformation and count action cannot be performed inside of the
 rdd1.map transformation. For more information, see SPARK-5063.

我真的不明白错误所指的“动作或转换”是什么。

我正在使用spark-2.1.1-hadoop2.7

重要编辑:这本书是正确的。我只是没有读到它不是正在广播的 RDD,而是使用 collectAsMap() 获得的地图版本。

谢谢!

【问题讨论】:

【参考方案1】:

是否可以在 Python 中广播 RDD?

TL;DR 没有。

当您认为 RDD 真正 是什么时,您会发现这根本不可能。您可以广播的 RDD 中没有任何内容。它太脆弱(可以这么说)。

RDD 是一种描述在某些数据集上进行分布式计算的数据结构。通过 RDD 的特性,您可以描述计算什么以及如何计算。它是一个抽象实体。

引用RDD的scaladoc:

表示可以并行操作的不可变的、分区的元素集合

在内部,每个 RDD 都有五个主要属性:

分区列表

计算每个分割的函数

对其他 RDD 的依赖列表

可选的,键值 RDD 的分区器(例如说 RDD 是散列分区的)

(可选)计算每个拆分的首选位置列表(例如 HDFS 文件的块位置)

您可以广播的内容不多(引用SparkContext.broadcast 方法的scaladoc):

broadcast[T](value: T)(implicit arg0: ClassTag[T]): Broadcast[T] 向集群广播一个只读变量,返回一个org.apache.spark.broadcast.Broadcast对象在分布式函数中读取它。变量只会发送到每个集群一次。

您只能广播一个真实的值,但 RDD 只是一个值的容器,只有在执行程序处理其数据时才可用。

来自Broadcast Variables:

广播变量允许程序员在每台机器上缓存一个只读变量,而不是随任务一起发送它的副本。例如,它们可用于高效地为每个节点提供大型输入数据集的副本。

稍后在同一个文档中:

这意味着显式创建广播变量仅在跨多个阶段的任务需要相同数据或以反序列化形式缓存数据很重要时才有用。

但是,您可以 collect RDD 持有的数据集并按如下方式广播它:

my_list = ["a", "d", "c", "b"]
my_list_rdd = sc.parallelize(my_list)
sc.broadcast(my_list_rdd.collect) // <-- collect the dataset

在“收集数据集”步骤中,数据集离开 RDD 空间并成为本地可用的集合,即 Python 值,然后可以广播。

【讨论】:

不错的答案,但始终必须警告将 collect 与大型 RDD 一起使用,特别是对于新用户。 收集这个数据集不就等于不把它变成一个RDD吗?【参考方案2】:

您不能广播 RDD。您将值广播到在处理 RDD 时多次使用的所有执行程序节点。因此,在您的代码中,您应该在广播之前收集您的 RDD。 collectRDD 转换为可以毫无问题地广播的本地 python 对象。

sc.broadcast(my_list_rdd.collect())

当您广播一个值时,该值会被序列化并通过网络发送到所有执行程序节点。您的 my_list_rdd 只是对分布在多个节点上的 RDD 的引用。序列化这个引用并将这个引用广播到所有工作节点对工作节点没有任何意义。所以你应该收集你的 RDD 的值并广播这个值。

更多关于 Spark Broadcast 的信息可以在here找到

注意:如果您的 RDD 太大,应用程序可能会遇到 OutOfMemory 错误。 collect 方法将所有数据拉到驱动程序的内存中,通常不够大。

【讨论】:

以上是关于如何在 PySpark 中广播 RDD?的主要内容,如果未能解决你的问题,请参考以下文章

如何知道 pyspark 中广播变量的可用内存量?

Spark篇---Spark中广播变量和累加器

如何在自组织网络中广播?

如何在 em-websocket 中广播或建立连接?

如何在 PySpark 中获得不同的字典 RDD?

如何在 PySpark 中将两个 rdd 合并为一个