pySpark forEachPartition - 代码在哪里执行
Posted
技术标签:
【中文标题】pySpark forEachPartition - 代码在哪里执行【英文标题】:pySpark forEachPartition - Where is code executed 【发布时间】:2019-04-12 15:27:46 【问题描述】:我在 2.3 版中使用 pySpark(在我当前的开发系统中无法更新到 2.4)并且有以下关于 foreachPartition 的问题。
首先是一点背景:据我了解,pySpark-UDFs
强制 Python 代码在 Python 实例中的 Java 虚拟机 (JVM) 之外执行,这会导致性能成本下降。
由于我需要将一些 Python 函数应用于我的数据并希望最大限度地降低开销成本,我的想法是至少将一组可处理的数据加载到驱动程序中并将其作为 Pandas-DataFrame 处理。无论如何,这将导致 Spark 失去并行性优势。
然后我读到foreachPartition
将函数应用于分区内的所有数据,因此允许并行处理。
我现在的问题是:
当我通过 foreachPartition
应用 Python 函数时,Python 执行是否发生在驱动程序进程中(因此分区数据通过网络传输到我的驱动程序)?
是在foreachPartition
中按行处理数据(意味着每个RDD-row都被逐一传输到Python实例),还是一次处理的分区数据(意味着,例如,整个分区转移到实例并由一个 Python 实例整体处理)?
提前感谢您的意见!
编辑:
我之前使用的驱动程序解决方案看起来像这样,取自 SO here:
for partition in rdd.mapPartitions(lambda partition: [list(partition)]).toLocalIterator():
# Do stuff on the partition
可以从docsrdd.toLocalIterator()
中读取,提供了必要的功能:
返回一个包含此 RDD 中所有元素的迭代器。迭代器将消耗与此 RDD 中最大分区一样多的内存。
【问题讨论】:
【参考方案1】:幸运的是,我偶然发现了 Mrinal 对 mapPartitions
的精彩解释(回复 here)。
mapPartitions
在 RDD 的每个分区上应用一个函数。因此,如果分区分布在不同的节点上,则可以使用并行化。在这些节点上创建了处理 Python 函数所必需的相应 Python 实例。
foreachPartition
只应用一个函数(例如,将数据写入 .csv 文件),mapPartitions
也返回一个新的 RDD。因此,使用foreachPartition
对我来说是错误的选择。
为了回答我的第二个问题:map
或 UDFs
之类的函数会创建一个新的 Python 实例并从 DataFrame/RDD 中逐行传递数据,从而导致大量开销。 foreachPartition
和 mapPartitions
(都是 RDD 函数)将整个分区传输到 Python 实例。
此外,使用生成器还减少了迭代此传输的分区数据所需的内存量(分区作为迭代器对象处理,而每一行随后通过迭代此对象进行处理)。
一个示例可能如下所示:
def generator(partition):
"""
Function yielding some result created by some function applied to each row of a partition (in this case lower-casing a string)
@partition: iterator-object of partition
"""
for row in partition:
yield [word.lower() for word in row["text"]]
df = spark.createDataFrame([(["TESTA"], ), (["TESTB"], )], ["text"])
df = df.repartition(2)
df.rdd.mapPartitions(generator).toDF(["text"]).show()
#Result:
+-----+
| text|
+-----+
|testa|
|testb|
+-----+
希望这可以帮助面临类似问题的人:)
【讨论】:
【参考方案2】:pySpark UDF 在执行器附近执行 - 即在一个单独的 python 实例中,每个执行器并行运行并在 spark 引擎 (scala) 和 python 解释器之间来回传递数据。
foreachPartition 中对 udfs 的调用也是如此
编辑——查看示例代码后
-
使用 RDD 不是使用 spark 的有效方式 - 您应该移动
到数据集
使您的代码将所有数据同步到驱动程序的是 collect()
foreachParition 将类似于 glom
【讨论】:
以上是关于pySpark forEachPartition - 代码在哪里执行的主要内容,如果未能解决你的问题,请参考以下文章
在 PySpark RDD 中,如何使用 foreachPartition() 打印出每个分区的第一条记录?
PySpark foreachPartition 并行写入数据库
Spark(21)——foreachPartition foreach
Apache Spark MySQL JavaRDD.foreachPartition - 为啥我得到 ClassNotFoundException