Apache Spark 使用 SQL 函数 nTile 对数据进行分区
Posted
技术标签:
【中文标题】Apache Spark 使用 SQL 函数 nTile 对数据进行分区【英文标题】:Apache Spark Partitioning Data Using a SQL Function nTile 【发布时间】:2020-11-13 22:00:27 【问题描述】:我正在尝试多种方法来优化使用分区的大型数据集的执行。特别是,我使用了一个常用于传统 SQL 数据库的函数,称为 nTile。
目标是使用 buckettind 和重新分区的组合将一定数量的行放入存储桶中。这允许 Apache Spark 在处理分区数据集或我应该说是分桶数据集时更高效地处理数据。
下面是两个例子。第一个示例显示了我如何使用 ntile 将数据集拆分为两个存储桶,然后将数据重新分区为存储桶 nTile 上称为 skew_data 的 2 个分区。
然后我使用相同的查询,但没有任何分桶或重新分区。
问题是没有分桶的查询比有分桶的查询更快,即使没有分桶的查询将所有数据放在一个分区中,而有分桶的查询将查询分成2个分区。
谁能告诉我这是为什么。
仅供参考 我在 Databricks 的 Apache Spark 集群上运行查询。 该集群只有一个具有 2 个内核和 15Gb 内存的单个节点。
第一个使用 nTile/Bucketting 和重新分区的示例
allin = spark.sql("""
SELECT
t1.make
, t2.model
, NTILE(2) OVER (ORDER BY t2.sale_price) AS skew_data
FROM
t1 INNER JOIN t2
ON t1.engine_size = t2.engine_size2
""")
.repartition(2, col("skew_data"), rand())
.drop('skew_data')
以上代码将数据分成如下分区,并有对应的分区分布
Number of partitions: 2
Partitioning distribution: [5556767, 5556797]
第二个例子:没有 nTile/Bucketting 或重新分区
allin_NO_nTile = spark.sql("""
SELECT
t1.make
,t2.model
FROM
t1 INNER JOIN t2
ON t1.engine_size = t2.engine_size2
""")
上面的代码将所有数据放到一个单独的分区中,如下图:
Number of partitions: 1
Partitioning distribution: [11113564]
我的问题是,为什么第二个查询(没有 nTile 或重新分区)比使用 nTile 和重新分区的查询更快?
我已竭尽全力尽可能完整地写出这个问题,但如果您需要进一步的解释,请随时提出。我真的很想弄清楚这一点。
【问题讨论】:
这两个查询的物理计划是什么?第二个查询有可能是广播哈希连接,它比排序合并连接或窗口函数更快,后者需要交换和排序数据。 @AndrewLong,感谢您与我们联系。我从来没有深入到这个问题的底部。但是,我被指出使用 bucketBy() 函数的方向。 【参考方案1】:我放弃了原来的方法,使用了名为 bucketBy() 的新 PySpark 函数。如果您想知道如何将 bucketBy() 应用于存储桶数据,请访问 https://www.youtube.com/watch?v=dv7IIYuQOXI&list=PLOmMQN2IKdjvowfXo_7hnFJHjcE3JOKwu&index=39
【讨论】:
以上是关于Apache Spark 使用 SQL 函数 nTile 对数据进行分区的主要内容,如果未能解决你的问题,请参考以下文章
在 Apache Spark SQL 中将中值作为窗口函数 (UDAF) 移动
在 Spark 中执行聚合函数时出错:ArrayType 无法转换为 org.apache.spark.sql.types.StructType