使用自定义分区器对 Pyspark 中的数据框进行分区

Posted

技术标签:

【中文标题】使用自定义分区器对 Pyspark 中的数据框进行分区【英文标题】:Partitioning of Data Frame in Pyspark using Custom Partitioner 【发布时间】:2018-10-13 07:45:23 【问题描述】:

寻找有关在 Pyspark 中使用自定义分区程序的一些信息。我有一个数据框,其中包含各个国家/地区的国家数据。因此,如果我对国家列进行重新分区,它会将我的数据分配到 n 个分区中,并将类似的国家数据保存到特定分区。当我看到使用glom() 方法时,这会创建一个倾斜的分区数据。

美国和中国等一些国家在特定数据框中拥有大量数据。我想重新分区我的数据框,这样如果国家是美国和中国,那么它将进一步分成大约 10 个分区,否则对于其他国家(如 IND、THA、AUS 等)保持分区相同。我们可以在 Pyspark 代码中扩展分区器类吗?

我在下面的链接中读到了这个,我们可以在 scala Spark 应用程序中扩展 scala 分区器类,并且可以修改分区器类以使用自定义逻辑根据需求重新分区我们的数据。就像我拥有的​​一样。请帮助在 Pyspark 中实现此解决方案。请参阅下面的链接What is an efficient way to partition by column but maintain a fixed partition count?


我使用的是 Spark 2.3.0.2 及以下版本是我的 Dataframe 结构:

datadf= spark.sql("""
    SELECT    
        ID_NUMBER ,SENDER_NAME ,SENDER_ADDRESS ,REGION_CODE ,COUNTRY_CODE
    from udb.sometable
""");

传入的数据包含六个国家/地区的数据,例如AUSINDTHARUSCHNUSACHNUSA 有偏差数据。

所以如果我在COUNTRY_CODE 上执行repartition,则两个分区包含大量数据,而其他分区则很好。我使用glom() 方法检查了这一点。

newdf = datadf.repartition("COUNTRY_CODE")

from pyspark.sql import SparkSession
from pyspark.sql import  HiveContext, DataFrameWriter, DataFrame

newDF = datadf.repartitionByRange(3,"COUNTRY_CODE","USA")

我正在尝试将我的数据重新分区为国家 USACHN 的另外 3 个分区,并希望将其他国家的数据保留在单个分区中。

This is what I am expecting 
AUS- one partition
IND- one partition
THA- one partition
RUS- one partition
CHN- three partition
USA- three partition

Traceback(最近一次调用最后一次):文件“”,第 1 行,in 文件 “/usr/hdp/current/spark2-client/python/pyspark/sql/dataframe.py”,行 1182,在 getattr "'%s' 对象没有属性 '%s'" % (self.class.name, name)) AttributeError: 'DataFrame' 对象没有属性 'repartitionByRange'

【问题讨论】:

【参考方案1】:

用散列尝试这样的事情:

newDf = oldDf.repartition(N, $"col1", $"coln")

或用于测距方法:

newDF = oldDF.repartitionByRange(N, $"col1", $"coln")

目前还没有 DF 的自定义分区。

在你的情况下,我会选择散列,但不能保证。

但如果您的数据有偏差,您可能需要做一些额外的工作,例如 2 列用于分区是最简单的方法。

例如现有的或新的列 - 在这种情况下,是对给定国家/地区应用分组的列,例如1 .. N,以及两列上的分区。

对于有很多分组的国家,你会得到 N 个合成子部门;对于基数较低的其他人,只有 1 个这样的组号。不是太难。两种分区都可以占用超过 1 列。

在我看来,分区的统一数量填充需要付出很多努力,而且并不是真正可以实现的,但是像这里这样的下一个最佳方法就足够了。在一定程度上相当于自定义分区。

否则,在 DF 上使用 .withColumn,您可以使用这些规则模拟自定义分区并填充新的 DF 列,然后应用 repartitionByRange。也没有那么难。

【讨论】:

repartitionByrange 函数是如何工作的?我们可以在 Pyspark 中使用它来重新分区数据帧吗? @感谢您的帮助和建议。您能否详细说明最后的声明。 (否则,在 DF 上使用 .withColumn 可以使用这些规则模拟自定义分区并填充新的 DF 列,然后应用 repartitionByRange。也不那么难。) 分区和重新分区是大话题 - 并不总是很好理解。散列更容易导致偏斜,范围更小。这取决于数据的性质。您需要同一分区中的所有数据还是可以与不同分区中的内容一起使用?但是,如果您想要在数字方面更平衡的分区 - 即使可能会产生不利影响 - 那么使用多个列可以帮助使用范围分区。一篇好文章:24tutorials.com/spark/deep-dive-into-partitioning-in-spark 然而,我所描述的我在这里和过去在其他系统上做过。成功。 是的就是这样 我无法检查,因为我在 spark-shell 上只有 2.2。 databricks 2.3 没有问题。替我退出。很奇怪,想知道它是否是 Databricks 扩展。【参考方案2】:

结构化 API 中没有自定义分区器,因此要使用自定义分区器,您需要下拉到 RDD API。简单的3个步骤如下:

    将结构化 API 转换为 RDD API
dataRDD = dataDF.rdd
    在 RDD API 中应用自定义分区器
import random

# Extract key from Row object
dataRDD = dataRDD.map(lambda r: (r[0], r))

def partitioner(key):
    if key == "CHN":
        return random.randint(1, 10)
    elif key == "USA":
        return random.randint(11, 20)
    else:
        # distinctCountryDict is a dict mapping distinct countries to distinct integers
        # these distinct integers should not overlap with range(1, 20)
        return distinctCountryDict[key]

numPartitions = 100
dataRDD = dataRDD.partitionBy(numPartitions, partitioner)

# Remove key extracted previously
dataRDD = dataRDD.map(lambda r: r[1])
    将 RDD API 转换回结构化 API
dataDF = dataRDD.toDF()

通过这种方式,您可以获得两全其美,Spark 类型和结构化 API 中的优化物理计划,以及低级 RDD API 中的自定义分区器。只有在绝对必要时,我们才会降级到低级 API。

【讨论】:

我认为当我们将 partitionBy 与 rdd 一起使用时,键应该是 int 类型..否则会引发错误..你说什么? @vikrantrana 我没有这个问题,你能发布你的错误吗? @ythdelmar- 谢谢。我在一些 rdd 上执行了类似的操作并发现了这个东西。我将再次检查它是否也适用于字符串数据类型。请参阅下面的链接以供参考。 ***.com/questions/47116294/… @vikrantrana 该线程还提到了一个潜在的问题,您需要先有一个键值对 RDD 才能执行 partitionBy,这是我上面的代码行 dataRDD = dataRDD.map(lambda r: (r[0], r))。也许您因为没有键值对 RDD 而犯了错误? @ythdelmar rdd.toDF() 是否保留分区?【参考方案3】:

在 PySpark 上应用用户定义的分区器没有直接的方法,捷径是使用 UDF 创建一个新列,根据业务逻辑为每个记录分配一个分区 ID。并使用新列进行分区,这样数据就会均匀分布。

numPartitions= 3
df = df.withColumn("Hash#", udf_country_hash(df['Country']))
df = df.withColumn("Partition#", df["Hash#"] % numPartitions)
df.repartition(numPartitions, "Partition#")

请查看在线版代码@ https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/8963851468310921/2231943684776180/5846184720595634/latest.html

根据我的经验,将 DataFrame 转换为 RDD 再转换回 DataFrame 是一项代价高昂的操作,最好避免。

【讨论】:

以上是关于使用自定义分区器对 Pyspark 中的数据框进行分区的主要内容,如果未能解决你的问题,请参考以下文章

PySpark 中 JDBC 上的自定义分区

pyspark 数据框中的自定义排序

Pyspark 数据框 OrderBy 分区级别还是整体?

动态填充pyspark数据框中列中的行

pyspark 数据框上的自定义函数

根据列值的变化对pyspark数据框进行分区