(SPARK)对应用了多个过滤器的数据进行分区的最佳方法是啥?
Posted
技术标签:
【中文标题】(SPARK)对应用了多个过滤器的数据进行分区的最佳方法是啥?【英文标题】:(SPARK) What is the best way to partition data on which multiple filters are applied?(SPARK)对应用了多个过滤器的数据进行分区的最佳方法是什么? 【发布时间】:2019-04-08 15:35:07 【问题描述】:我在 Spark(在 azure databricks 上)使用 150 亿行文件,如下所示:
+---------+---------------+----------------+-------------+--------+------+
|client_id|transaction_key|transaction_date| product_id|store_id|spend|
+---------+---------------+----------------+-------------+--------+------+
| 1| 7587_20121224| 2012-12-24| 38081275| 787| 4.54|
| 1| 10153_20121224| 2012-12-24| 4011| 1053| 2.97|
| 2| 6823_20121224| 2012-12-24| 561122924| 683| 2.94|
| 3| 11131_20121224| 2012-12-24| 80026282| 1131| 0.4|
| 3| 7587_20121224| 2012-12-24| 92532| 787| 5.49|
这些数据用于我的所有查询,主要包括 groupby(例如 product_id)、sum 和 count distinct:
results = trx.filter(col("transaction_date") > "2018-01-01"
&
col("product_id").isin(["38081275", "4011"])
.groupby("product_id")
.agg(sum("spend").alias("total_spend"),
countdistinct("transaction_key").alias("number_trx"))
我从不需要 100% 使用这些数据,我总是从过滤器开始:
transaction_date(1000 个不同的值) product_id(1 000 000 个不同的值) store_id(1000 个不同的值)==> 在 parquet 文件中分区这些数据的最佳方法是什么?
我最初在 transaction_date 对数据进行了分区:
trx.write.format("parquet").mode("overwrite").partitionBy("transaction_date").save("dbfs:/linkToParquetFile")
这将创建大小大致相同的分区。 但是,大多数查询需要保留至少 60% 的 transaction_date,而在 1 个查询中通常只选择少数 product_id。 (通常保留 70% 的 store_id)
==> 考虑到这一点,有没有办法构建镶木地板文件?
似乎对 product_id 上的数据进行分区会创建太多分区...
谢谢!
【问题讨论】:
【参考方案1】:例如,您可以使用多个列进行分区(它会创建子文件夹)并且 spark 可以使用分区过滤器
另一个好主意是存储更多信息here(以避免额外的洗牌)
以蜂巢为例
trx.write.partitionBy("transaction_date", "store_id").bucketBy(1000, "product_id").saveAsTable("tableName")
阅读使用
spark.table("tableName")
【讨论】:
感谢@Kamrus,成功了!出于某种原因,我认为按几列进行分区会根据这些列的组合创建文件夹。但是,它会在每个文件夹中创建子文件夹,这使我可以非常有效地使用分区过滤器。以上是关于(SPARK)对应用了多个过滤器的数据进行分区的最佳方法是啥?的主要内容,如果未能解决你的问题,请参考以下文章
Spark调研笔记第3篇 - Spark集群对应用的调度策略简介