Apache Spark 使用 SQL 函数 nTile 对数据进行分区

Posted

技术标签:

【中文标题】Apache Spark 使用 SQL 函数 nTile 对数据进行分区【英文标题】:Apache Spark Partitioning Data Using a SQL Function nTile 【发布时间】:2020-11-13 22:00:27 【问题描述】:

我正在尝试多种方法来优化使用分区的大型数据集的执行。特别是,我使用了一个常用于传统 SQL 数据库的函数,称为 nTile。

目标是使用 buckettind 和重新分区的组合将一定数量的行放入存储桶中。这允许 Apache Spark 在处理分区数据集或我应该说是分桶数据集时更高效地处理数据。

下面是两个例子。第一个示例显示了我如何使用 ntile 将数据集拆分为两个存储桶,然后将数据重新分区为存储桶 nTile 上称为 skew_data 的 2 个分区。

然后我使用相同的查询,但没有任何分桶或重新分区。

问题是没有分桶的查询比有分桶的查询更快,即使没有分桶的查询将所有数据放在一个分区中,而有分桶的查询将查询分成2个分区。

谁能告诉我这是为什么。

仅供参考 我在 Databricks 的 Apache Spark 集群上运行查询。 该集群只有一个具有 2 个内核和 15Gb 内存的单个节点。

第一个使用 nTile/Bucketting 和重新分区的示例

allin = spark.sql("""
  SELECT
    t1.make
    , t2.model
    , NTILE(2) OVER (ORDER BY t2.sale_price) AS skew_data
  FROM 
    t1 INNER JOIN t2
    ON t1.engine_size = t2.engine_size2
""")
.repartition(2, col("skew_data"), rand())
.drop('skew_data')

以上代码将数据分成如下分区,并有对应的分区分布

Number of partitions: 2
Partitioning distribution: [5556767, 5556797]

第二个例子:没有 nTile/Bucketting 或重新分区

allin_NO_nTile = spark.sql("""
  SELECT
    t1.make
    ,t2.model
  FROM 
    t1 INNER JOIN t2
    ON t1.engine_size = t2.engine_size2
""")

上面的代码将所有数据放到一个单独的分区中,如下图:

Number of partitions: 1
Partitioning distribution: [11113564]

我的问题是,为什么第二个查询(没有 nTile 或重新分区)比使用 nTile 和重新分区的查询更快?

我已竭尽全力尽可能完整地写出这个问题,但如果您需要进一步的解释,请随时提出。我真的很想弄清楚这一点。

【问题讨论】:

这两个查询的物理计划是什么?第二个查询有可能是广播哈希连接,它比排序合并连接或窗口函数更快,后者需要交换和排序数据。 @AndrewLong,感谢您与我们联系。我从来没有深入到这个问题的底部。但是,我被指出使用 bucketBy() 函数的方向。 【参考方案1】:

我放弃了原来的方法,使用了名为 bucketBy() 的新 PySpark 函数。如果您想知道如何将 bucketBy() 应用于存储桶数据,请访问 https://www.youtube.com/watch?v=dv7IIYuQOXI&list=PLOmMQN2IKdjvowfXo_7hnFJHjcE3JOKwu&index=39

【讨论】:

以上是关于Apache Spark 使用 SQL 函数 nTile 对数据进行分区的主要内容,如果未能解决你的问题,请参考以下文章

Spark SQL内置函数

在 Apache Spark SQL 中将中值作为窗口函数 (UDAF) 移动

在 Spark 中执行聚合函数时出错:ArrayType 无法转换为 org.apache.spark.sql.types.StructType

apache spark sql中的等效percentile_cont函数

Spark SQL UDF示例

Spark---内置函数