pyspark - 分区数据的计算(使用“附加”模式创建)慢
Posted
技术标签:
【中文标题】pyspark - 分区数据的计算(使用“附加”模式创建)慢【英文标题】:pyspark - calculation of partitioned data (created with "append" mode) slow 【发布时间】:2021-10-06 05:48:31 【问题描述】:分区后查询出现性能问题。
我每天有一个大约 3000 万行和 20 列的 parquet 文件。例如,文件data_20210721.parquet
看起来像:
+-----------+---------------------+---------------------+------------+-----+
| reference | date_from | date_to | daytime | ... |
+-----------+---------------------+---------------------+------------+-----+
| A | 2021-07-21 17:30:25 | 2021-07-22 02:21:57 | 2021-07-22 | ... |
| A | 2021-07-21 12:10:10 | 2021-07-21 13:00:00 | 2021-07-21 | ... |
| A | ... | ... | ... | ... |
+-----------+---------------------+---------------------+------------+-----+
我们有一个代码来处理它,使其只有一天并减少一个午夜,这样我们就有了:
+-----------+---------------------+---------------------+------------+-----+
| reference | date_from | date_to | daytime | ... |
+-----------+---------------------+---------------------+------------+-----+
| A | 2021-07-21 17:30:25 | 2021-07-22 00:00:00 | 2021-07-21 | ... | <- split at midnight + daytime update
| A | 2021-07-22 00:00:00 | 2021-07-22 02:21:57 | 2021-07-22 | ... | <- residual
| A | 2021-07-21 12:10:10 | 2021-07-21 13:00:00 | 2021-07-21 | ... |
| A | ... | ... | ... | ... |
+-----------+---------------------+---------------------+------------+-----+
第 2 行,可以称为残差,因为它与文件不是同一天。
然后我们想每天生成 1 个镶木地板,因此默认解决方案是处理每个文件并保存数据框:
df.write.partitionBy(["id", "daytime"]).mode("append").parquet("hdfs/path")
模式设置为追加,因为第二天,我们可能会有过去/未来几天的残差。
还有其他级别的分区,例如:
ID : 固定一年左右(保存这样的存储非常好;)) 周数 国家即使分区在行方面非常“平衡”,处理时间也会变得非常慢。
例如,要计算给定日期集每天的行数:
原始 df(7s 秒):spark.read.parquet("path/to/data_2021071[0-5].parquet")\
.groupBy("DayTime")\
.count()\
.show()
分区数据(几分钟)
spark.read.parquet("path/to/data")\
.filter( (col("DayTime") >= "2021-07-10") & (col("DayTime") <= "2021-07-15") )\
.groupBy("DayTime")\
.count()\
.show()
我们认为在最后一级有太多的小分区(因为追加,大约有 600 个非常小的文件,只有几 Kb/Mb),所以我们尝试为每个分区合并它们,但没有任何改进。我们还尝试仅在 daytime
上进行分区(以防多个级别的分区会产生问题)。
是否有任何解决方案可以提高性能(或了解瓶颈在哪里)?
它可以与我们正在对date
列进行分区的事实相关联吗?我看到了很多按年/月/日进行分区的例子,例如,它们是 3 个整数,但不符合我们的需要。
这个解决方案非常完美地解决了我们遇到的很多问题,但是如果太重要而无法保持原样,则会导致性能损失。欢迎任何建议:)
编辑 1:
问题来自以下事实:计划不一样:
spark.read.parquet("path/to/data/DayTime=2021-07-10")
和
spark.read.parquet("path/to/data/").filter(col("DayTime")=="2021-07-10")
这是一个小示例的计划,其中DayTime
已转换为“long”,因为我认为速度缓慢可能是由于数据类型:
spark.read.parquet("path/to/test/").filter(col("ts") == 20200103).explain(extended=True)
== Parsed Logical Plan ==
'Filter ('ts = 20200103)
+- AnalysisBarrier
+- Relation[date_from#4297,date_to#4298, ....] parquet
== Analyzed Logical Plan ==
date_from: timestamp, date_to: timestamp, ts: int, ....
Filter (ts#4308 = 20200103)
+- Relation[date_from#4297,date_to#4298,ts#4308, ....] parquet
== Optimized Logical Plan ==
Filter (isnotnull(ts#4308) && (ts#4308 = 20200103))
+- Relation[date_from#4297,date_to#4298,ts#4308, ....] parquet
== Physical Plan ==
*(1) FileScan parquet [date_from#4297,date_to#4298,ts#4308, ....] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://.../test_perf], PartitionCount: 1, PartitionFilters: [isnotnull(ts#4308), (ts#4308 = 20200103)], PushedFilters: [], ReadSchema: struct<date_from:timestamp,date_to:timestamp, ....
对
spark.read.parquet("path/to/test/ts=20200103").explain(extended=True)
== Parsed Logical Plan ==
Relation[date_from#2086,date_to#2087, ....] parquet
== Analyzed Logical Plan ==
date_from: timestamp, date_to: timestamp,, ....] parquet
== Optimized Logical Plan ==
Relation[date_from#2086,date_to#2087, ....] parquet
== Physical Plan ==
*(1) FileScan parquet [date_from#2086,date_to#2087, .....] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://.../test_perf/ts=20200103], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<date_from:timestamp,date_to:timestamp, ....
提前致谢,
尼古拉斯
【问题讨论】:
【参考方案1】:您必须确保您的filter
实际使用分区结构,在磁盘级别进行修剪,而不是将所有数据放入内存然后应用过滤器。
尝试检查您的身体计划
spark.read.parquet("path/to/data")\
.filter( (col("DayTime") >= "2021-07-10") & (col("DayTime") <= "2021-07-15") )
.explain()
它应该有一个类似于PartitionFilters: [isnotnull(DayTime#123), (DayTime#76 = your condition)],
的阶段
我的猜测是你的情况,它没有使用这个PartitionFilters
并且扫描了整个数据。
我建议尝试使用小数据集尝试您的语法/重新分区策略,直到达到PartitionFilters
。
【讨论】:
你说得对,我只试了 1 周,优化后的方案并不完全相同。这是在 spark 某处激活的东西(我没有确切的版本,但它是 2.X )? 不,没有额外的激活步骤,它存在于 2.x 中。可能是语法或给出条件的方式有问题。只需尝试使用简单的===
来检查它是否正在启动。
===
对 pyspark 无效。我做了一些试验,我将在问题的编辑中添加。以上是关于pyspark - 分区数据的计算(使用“附加”模式创建)慢的主要内容,如果未能解决你的问题,请参考以下文章