pyspark - 分区数据的计算(使用“附加”模式创建)慢

Posted

技术标签:

【中文标题】pyspark - 分区数据的计算(使用“附加”模式创建)慢【英文标题】:pyspark - calculation of partitioned data (created with "append" mode) slow 【发布时间】:2021-10-06 05:48:31 【问题描述】:

分区后查询出现性能问题。

我每天有一个大约 3000 万行和 20 列的 parquet 文件。例如,文件data_20210721.parquet 看起来像:

+-----------+---------------------+---------------------+------------+-----+
| reference |      date_from      |       date_to       |  daytime   | ... |
+-----------+---------------------+---------------------+------------+-----+
| A         | 2021-07-21 17:30:25 | 2021-07-22 02:21:57 | 2021-07-22 | ... |
| A         | 2021-07-21 12:10:10 | 2021-07-21 13:00:00 | 2021-07-21 | ... |
| A         | ...                 | ...                 | ...        | ... |
+-----------+---------------------+---------------------+------------+-----+

我们有一个代码来处理它,使其只有一天并减少一个午夜,这样我们就有了:

+-----------+---------------------+---------------------+------------+-----+
| reference |      date_from      |       date_to       |  daytime   | ... |
+-----------+---------------------+---------------------+------------+-----+
| A         | 2021-07-21 17:30:25 | 2021-07-22 00:00:00 | 2021-07-21 | ... | <- split at midnight + daytime update
| A         | 2021-07-22 00:00:00 | 2021-07-22 02:21:57 | 2021-07-22 | ... | <- residual
| A         | 2021-07-21 12:10:10 | 2021-07-21 13:00:00 | 2021-07-21 | ... |
| A         | ...                 | ...                 | ...        | ... |
+-----------+---------------------+---------------------+------------+-----+

2 行,可以称为残差,因为它与文件不是同一天。

然后我们想每天生成 1 个镶木地板,因此默认解决方案是处理每个文件并保存数据框:

df.write.partitionBy(["id", "daytime"]).mode("append").parquet("hdfs/path")

模式设置为追加,因为第二天,我们可能会有过去/未来几天的残差。

还有其他级别的分区,例如:

ID : 固定一年左右(保存这样的存储非常好;)) 周数 国家

即使分区在行方面非常“平衡”,处理时间也会变得非常慢。

例如,要计算给定日期集每天的行数:

原始 df(7s 秒):
spark.read.parquet("path/to/data_2021071[0-5].parquet")\
.groupBy("DayTime")\
.count()\
.show()
分区数据(几分钟)
spark.read.parquet("path/to/data")\
.filter( (col("DayTime") >= "2021-07-10") & (col("DayTime") <= "2021-07-15") )\
.groupBy("DayTime")\
.count()\
.show()

我们认为在最后一级有太多​​的小分区(因为追加,大约有 600 个非常小的文件,只有几 Kb/Mb),所以我们尝试为每个分区合并它们,但没有任何改进。我们还尝试仅在 daytime 上进行分区(以防多个级别的分区会产生问题)。

是否有任何解决方案可以提高性能(或了解瓶颈在哪里)? 它可以与我们正在对date 列进行分区的事实相关联吗?我看到了很多按年/月/日进行分区的例子,例如,它们是 3 个整数,但不符合我们的需要。

这个解决方案非常完美地解决了我们遇到的很多问题,但是如果太重要而无法保持原样,则会导致性能损失。欢迎任何建议:)

编辑 1:

问题来自以下事实:计划不一样:

spark.read.parquet("path/to/data/DayTime=2021-07-10")

spark.read.parquet("path/to/data/").filter(col("DayTime")=="2021-07-10")

这是一个小示例的计划,其中DayTime 已转换为“long”,因为我认为速度缓慢可能是由于数据类型:

spark.read.parquet("path/to/test/").filter(col("ts") == 20200103).explain(extended=True)

== Parsed Logical Plan ==
'Filter ('ts = 20200103)
+- AnalysisBarrier
      +- Relation[date_from#4297,date_to#4298, ....] parquet

== Analyzed Logical Plan ==
date_from: timestamp, date_to: timestamp, ts: int, ....
Filter (ts#4308 = 20200103)
+- Relation[date_from#4297,date_to#4298,ts#4308, ....] parquet

== Optimized Logical Plan ==
Filter (isnotnull(ts#4308) && (ts#4308 = 20200103))
+- Relation[date_from#4297,date_to#4298,ts#4308, ....] parquet

== Physical Plan ==
*(1) FileScan parquet [date_from#4297,date_to#4298,ts#4308, ....] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://.../test_perf], PartitionCount: 1, PartitionFilters: [isnotnull(ts#4308), (ts#4308 = 20200103)], PushedFilters: [], ReadSchema: struct<date_from:timestamp,date_to:timestamp, ....

spark.read.parquet("path/to/test/ts=20200103").explain(extended=True)

== Parsed Logical Plan ==
Relation[date_from#2086,date_to#2087, ....] parquet

== Analyzed Logical Plan ==
date_from: timestamp, date_to: timestamp,, ....] parquet

== Optimized Logical Plan ==
Relation[date_from#2086,date_to#2087, ....] parquet

== Physical Plan ==
*(1) FileScan parquet [date_from#2086,date_to#2087, .....] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://.../test_perf/ts=20200103], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<date_from:timestamp,date_to:timestamp, ....

提前致谢,

尼古拉斯

【问题讨论】:

【参考方案1】:

您必须确保您的filter 实际使用分区结构,在磁盘级别进行修剪,而不是将所有数据放入内存然后应用过滤器。

尝试检查您的身体计划

spark.read.parquet("path/to/data")\
.filter( (col("DayTime") >= "2021-07-10") & (col("DayTime") <= "2021-07-15") )
.explain()

它应该有一个类似于PartitionFilters: [isnotnull(DayTime#123), (DayTime#76 = your condition)],的阶段

我的猜测是你的情况,它没有使用这个PartitionFilters 并且扫描了整个数据。

我建议尝试使用小数据集尝试您的语法/重新分区策略,直到达到PartitionFilters

【讨论】:

你说得对,我只试了 1 周,优化后的方案并不完全相同。这是在 spark 某处激活的东西(我没有确切的版本,但它是 2.X )? 不,没有额外的激活步骤,它存在于 2.x 中。可能是语法或给出条件的方式有问题。只需尝试使用简单的=== 来检查它是否正在启动。 === 对 pyspark 无效。我做了一些试验,我将在问题的编辑中添加。

以上是关于pyspark - 分区数据的计算(使用“附加”模式创建)慢的主要内容,如果未能解决你的问题,请参考以下文章

计算每个 pyspark RDD 分区中的元素数

PySpark 根据特定列重新分区

如何在 for 循环中附加 pyspark 数据帧?

使用自定义分区器对 Pyspark 中的数据框进行分区

Pyspark Parquet - 重新分区后排序

PySpark:将 PythonRDD 附加/合并到 PySpark 数据帧