PySpark 根据特定列重新分区

Posted

技术标签:

【中文标题】PySpark 根据特定列重新分区【英文标题】:PySpark repartition according to a specific column 【发布时间】:2018-05-22 12:32:54 【问题描述】:

我正在寻找如何重新分区(在 PySpark 中)数据集,以便在指定列中具有相同 ID 的所有行移动到同一个分区。实际上,我必须在每个分区中运行一个程序,该程序为具有相同 ID 的所有行计算单个值。

我有一个从 HIVE QL 查询构建的数据框 (df)(假设包含 10000 个不同的 ID)。 我试过了:

df = df.repartition("My_Column_Name")

默认情况下,我得到 200 个分区,但我总是得到 199 个 ID,当我运行程序时,我得到了重复的计算值。

我在网上查看,有人建议定义一个自定义分区器以与 repartition 方法一起使用,但我无法在 Python 中找到如何做到这一点。

有没有办法正确地重新分区?

【问题讨论】:

让我建议您重新表述这个问题。我想您想要实现的是将您的 DataFrame 重新分区为与 DataFrame 包含的不同 ID 数量相同的分区数。在这种特殊情况下,您需要 1000 个分区,每个分区都包含一个 ID。我对么?您试图实现这一目标的原因在您的问题中并不清楚,但这表明您对使用 Spark 的“正常”方式存在一些误解。 如果您需要运行与具有相同 ID 的某些行相关的代码,只需 groupByKey(或 reduceByKey)该 ID 并执行所需的逻辑。但不要将你想做的“逻辑”部分与“执行质量”部分混为一谈。分区数与后者更相关,应根据集群上的核心数进行选择。 我只想将具有相同 ID 的所有行移动到同一个分区。如果一个分区包含多组具有不同 ID 的行,则没有问题。 1000 只是一个示例,不同 ID 的数量可能非常多。因此,将 DF 划分为多个不同 ID 的分区不应带来良好的性能。我需要它,因为我使用 RDD mapPartition 方法运行一个函数(不能使用基本的 Spark 转换函数实现)。这个函数对每个不同的 ID 产生一个结果,这就是为什么我需要在同一个分区中拥有所有具有相同 ID 的行。 【参考方案1】:

我只想将具有相同 ID 的所有行移动到同一个分区。如果一个分区包含多组具有不同 ID 的行,则没有问题。 1000 只是一个示例,不同 ID 的数量可能非常多。因此,将 DF 划分为多个不同 ID 的分区不应带来良好的性能。我需要它,因为我使用 RDD mapPartition 方法运行一个函数(不能使用基本的 Spark 转换函数实现)。此函数为每个不同的 ID 生成一个结果,这就是为什么我需要在同一分区中拥有具有相同 ID 的所有行。

【讨论】:

以上是关于PySpark 根据特定列重新分区的主要内容,如果未能解决你的问题,请参考以下文章

使用 pyspark 重新分区失败并出现错误

使用自定义分区器对 Pyspark 中的数据框进行分区

如何根据列在火花中重新分区?

Pyspark:重新分区与分区

SQL - 根据列值重新启动分区

pyspark:重新分区后出现“太多值”错误