如何在倾斜列上重新分区 Spark scala 中的数据框?

Posted

技术标签:

【中文标题】如何在倾斜列上重新分区 Spark scala 中的数据框?【英文标题】:How to repartition a dataframe in Spark scala on a skewed column? 【发布时间】:2017-06-15 11:54:22 【问题描述】:

我有一个数据框,它有 500 个分区并且被洗牌。 我想根据一列说“城市”重新分区 但是 city 列非常倾斜,因为它只有三个可能的值。 因此,当我基于列城市重新分区时,即使我指定了 500 个分区,也只有三个正在获取数据。因此,我遇到了性能问题。 我在互联网上搜索但找不到任何合适的解决方案。 有没有办法跨基于城市列的分区统一重新分区数据帧。 我需要的是:city1 表示前 5 个分区,city2 表示接下来的 490 个分区,city3 表示剩余的 5 个分区。

【问题讨论】:

这个操作的目的是什么? 当这个dataframe被写入hive时,我们得到3个大文件,因为只有3个partition有数据。我们需要分发数据,以便在将数据帧写入 hive 时生成可管理大小的文件 我想@zero323 已经回答了这个here。您可能应该在分区中使用多个列 【参考方案1】:

当我们遇到已知偏斜的数据时,我们使用了一个分区器,该分区器对偏斜的值应用受控随机化。我概述了如何做到这一点in this answer。

【讨论】:

如果您的分区方案基于随机性而不是纯粹基于数据,则不确定 Spark(或 Hive)如何有效地处理诸如分区连接(或分区修剪)之类的事情,例如(现在或将来的版本)。它确实消除了了解您的数据如何倾斜的需要。我很想知道你对此的想法,因为我可能会遗漏一些东西(也许 Spark 或 Hive 还没有真正具备这种能力,就像 Oracle 在这一点上,Spark 的优化器远不如 Oracle 的复杂。 其实我最后在做这个分区方案。连接发生在基于其他列之前。最后,我只需要根据这个倾斜的列对数据进行近乎均匀的分区。之后不再执行其他操作。因此这种方法奏效了。【参考方案2】:

您可以通过指定 1 个或更多列(在本例中为 2 个)重新分区为 500 个分区。例如(pyspark):

file_x = "hdfs://mycluster/user/data/x"
df_x = sq.read.format("com.databricks.spark.avro").load(file_x)

print str(datetime.now()) + ": FileX partitions: " + str(df_x.rdd.getNumPartitions())

# repartition based on 2 columns
df_y = df_x.repartition(500, "CITY", "ADDRESS")

print str(datetime.now()) + ": FileY partitions: " + str(df_y.rdd.getNumPartitions())

更多信息请见docs

【讨论】:

这不一定能解决歪斜问题。这取决于其他列中有多少可变性。 @Sim 是的,你需要知道你自己的数据。例如,按城市和国家划分可能不是一个好的选择。【参考方案3】:

在数据框上使用DISTRIBUTE BY 子句。

根据您的要求,为了处理偏差,您可以使用 distribute by 重新分区您的数据。

对于要分区的表达式,选择您知道会均匀分布数据的表达式。

df.distributeBy($'<expression>', 30)

expression 中,您可以使用city.toString().length &gt; Randome.nextInt(&lt;avg-city-length&gt;) 之类的表达式随机化结果

【讨论】:

使用布尔表达式只会产生两个可能值之一,这会使事情变得更糟。您需要一个能够为同一输入产生许多不同值的表达式。 是的......该表达式应该给出输出随机值,以便它在分区之间均匀分布。 ---------------->> 你可以给任何表达,这就是为什么,我用RANDOM

以上是关于如何在倾斜列上重新分区 Spark scala 中的数据框?的主要内容,如果未能解决你的问题,请参考以下文章

spark-partitionBy

如何在 spark scala 中覆盖特定的表分区

在 Spark Scala 中的列上运行累积/迭代 Costum 方法

spark调优之数据倾斜

spark数据倾斜

OpenMLDB: 一文了解窗口倾斜优化技术细节