pySpark forEachPartition - 代码在哪里执行

Posted

技术标签:

【中文标题】pySpark forEachPartition - 代码在哪里执行【英文标题】:pySpark forEachPartition - Where is code executed 【发布时间】:2019-04-12 15:27:46 【问题描述】:

我在 2.3 版中使用 pySpark(在我当前的开发系统中无法更新到 2.4)并且有以下关于 foreachPartition 的问题。

首先是一点背景:据我了解,pySpark-UDFs 强制 Python 代码在 Python 实例中的 Java 虚拟机 (JVM) 之外执行,这会导致性能成本下降。 由于我需要将一些 Python 函数应用于我的数据并希望最大限度地降低开销成本,我的想法是至少将一组可处理的数据加载到驱动程序中并将其作为 Pandas-DataFrame 处理。无论如何,这将导致 Spark 失去并行性优势。 然后我读到foreachPartition 将函数应用于分区内的所有数据,因此允许并行处理。

我现在的问题是:

    当我通过 foreachPartition 应用 Python 函数时,Python 执行是否发生在驱动程序进程中(因此分区数据通过网络传输到我的驱动程序)?

    是在foreachPartition中按行处理数据(意味着每个RDD-row都被逐一传输到Python实例),还是一次处理的分区数据(意味着,例如,整个分区转移到实例并由一个 Python 实例整体处理)?

提前感谢您的意见!


编辑:

我之前使用的驱动程序解决方案看起来像这样,取自 SO here:

for partition in rdd.mapPartitions(lambda partition: [list(partition)]).toLocalIterator():
    # Do stuff on the partition

可以从docsrdd.toLocalIterator()中读取,提供了必要的功能:

返回一个包含此 RDD 中所有元素的迭代器。迭代器将消耗与此 RDD 中最大分区一样多的内存

【问题讨论】:

【参考方案1】:

幸运的是,我偶然发现了 Mrinal 对 mapPartitions 的精彩解释(回复 here)。

mapPartitions 在 RDD 的每个分区上应用一个函数。因此,如果分区分布在不同的节点上,则可以使用并行化。在这些节点上创建了处理 Python 函数所必需的相应 Python 实例。 foreachPartition 只应用一个函数(例如,将数据写入 .csv 文件),mapPartitions 也返回一个新的 RDD。因此,使用foreachPartition 对我来说是错误的选择。

为了回答我的第二个问题:mapUDFs 之类的函数会创建一个新的 Python 实例并从 DataFrame/RDD 中逐行传递数据,从而导致大量开销。 foreachPartitionmapPartitions(都是 RDD 函数)将整个分区传输到 Python 实例。

此外,使用生成器还减少了迭代此传输的分区数据所需的内存量(分区作为迭代器对象处理,而每一行随后通过迭代此对象进行处理)。

一个示例可能如下所示:

def generator(partition):
    """
    Function yielding some result created by some function applied to each row of a partition (in this case lower-casing a string)

    @partition: iterator-object of partition
    """

    for row in partition:
        yield [word.lower() for word in row["text"]]


df = spark.createDataFrame([(["TESTA"], ), (["TESTB"], )], ["text"])
df = df.repartition(2)
df.rdd.mapPartitions(generator).toDF(["text"]).show()


#Result:
+-----+
| text|
+-----+
|testa|
|testb|
+-----+

希望这可以帮助面临类似问题的人:)

【讨论】:

【参考方案2】:

pySpark UDF 在执行器附近执行 - 即在一个单独的 python 实例中,每个执行器并行运行并在 spark 引擎 (scala) 和 python 解释器之间来回传递数据。

foreachPartition 中对 udfs 的调用也是如此

编辑——查看示例代码后

    使用 RDD 不是使用 spark 的有效方式 - 您应该移动 到数据集 使您的代码将所有数据同步到驱动程序的是 collect() foreachParition 将类似于 glom

【讨论】:

以上是关于pySpark forEachPartition - 代码在哪里执行的主要内容,如果未能解决你的问题,请参考以下文章

在 PySpark RDD 中,如何使用 foreachPartition() 打印出每个分区的第一条记录?

PySpark foreachPartition 并行写入数据库

石墨或grafana可以用来监控pyspark指标吗?

Spark(21)——foreachPartition foreach

Apache Spark MySQL JavaRDD.foreachPartition - 为啥我得到 ClassNotFoundException

foreach和foreachPartition