如何确定 PySpark 数据框分区的“首选位置”?

Posted

技术标签:

【中文标题】如何确定 PySpark 数据框分区的“首选位置”?【英文标题】:How to determine "preferred location" for partitions of PySpark dataframe? 【发布时间】:2018-06-15 09:21:25 【问题描述】:

我试图了解coalesce 如何确定如何将初始分区加入最终问题,显然“首选位置”与它有关。

根据this question,Scala Spark 有一个函数preferredLocations(split: Partition) 可以识别这个。但我对 Spark 的 Scala 方面一点也不熟悉。有没有办法在 PySpark 级别确定给定行或分区 ID 的首选位置?

【问题讨论】:

【参考方案1】:

是的,理论上是可以的。强制某种形式的偏好的示例数据(可能有一个更简单的示例):

rdd1 = sc.range(10).map(lambda x: (x % 4, None)).partitionBy(8)
rdd2 = sc.range(10).map(lambda x: (x % 4, None)).partitionBy(8)

# Force caching so downstream plan has preferences
rdd1.cache().count()

rdd3 = rdd1.union(rdd2)

现在你可以定义一个助手了:

from pyspark import SparkContext

def prefered_locations(rdd):
    def to_py_generator(xs):
        """Convert Scala List to Python generator"""
        j_iter = xs.iterator()
        while j_iter.hasNext():
            yield j_iter.next()

    # Get JVM
    jvm =  SparkContext._active_spark_context._jvm
    # Get Scala RDD
    srdd = jvm.org.apache.spark.api.java.JavaRDD.toRDD(rdd._jrdd)
    # Get partitions
    partitions = srdd.partitions()
    return 
        p.index(): list(to_py_generator(srdd.preferredLocations(p)))
        for p in partitions
    

应用:

prefered_locations(rdd3)

# 0: ['...'],
#  1: ['...'],
#  2: ['...'],
#  3: ['...'],
#  4: [],
#  5: [],
#  6: [],
#  7: []

【讨论】:

此代码在我的 RDD 上运行没有错误,并返回预期的分区数,但都有一个空列表。我可以认为这意味着我的分区实际上没有任何首选位置信息吗?或者这可能是一个错误? 许多 RDD 根本没有首选位置(这就是为什么这个非常复杂的示例的原因)。即使在上面的示例中,也只有一些分区(我相信这是分区感知联合的结果)具有首选位置。如果您使用支持数据局部性约束的来源,它应该更明显。

以上是关于如何确定 PySpark 数据框分区的“首选位置”?的主要内容,如果未能解决你的问题,请参考以下文章

使用自定义分区器对 Pyspark 中的数据框进行分区

加入两个分区数据框pyspark

增加或减少聚合的分区?

Pyspark OLD 数据框分区到新数据框

根据列值的变化对pyspark数据框进行分区

通过 pyspark 数据框创建配置单元管理的分区表并为每个运行附加数据