Apache Hudi一种基于增量日志文件数的压缩策略
Posted scx_white
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Apache Hudi一种基于增量日志文件数的压缩策略相关的知识,希望对你有一定的参考价值。
前言
Hudi
的表格式分为COW(copy on write)
和MOR(merge on read)
,COW
表每次的upsert
操作都会根据不同的索引(bloom index
,hbase index
,bucket index
等等)查找到数据所在的基础数据文件,然后合并基础数据文件生成新的基础数据文件,有很明显的写放大的问题。为了提高upsert
的性能问题,Hudi
提供了一种新的表类型:MOR
表。该表在每次的upsert
操作时,并不会实时的和基础数据文件进行合并,而是生成新的增量日志文件,而增量文件和基础文件的合并就叫做压缩(compaction
)。所以当一个读操作读取MOR
表时,此时会读取该表的基础数据文件和所有的增量日志文件,然后进行一个合并操作(比如根据主键聚合取更新时间最新的那条数据),最后将合并后的结果返回给终端。读时合并(merge on read
)这也是为何叫MOR
表的原因。
大家都知道,在 olap
引擎中,大量小文件对于查询的性能影响很大。首先是磁盘的多次寻址问题,其次是 mpp
架构的引擎对于表的读操作通常是根据文件来进行 task
个数的划分,大量小文件会导致一个 job
生成大量的 task
来读取数据,而每次 task
的启动、销毁也会占用大量时间。最终导致任务执行过慢的问题。所以通常在写入数据之前,我们都会进行小文件的合并,比如 SPARK
的 repartition
和 coalesce
。当然 Hudi
也有类似的操作,那就是compaction
.
Hudi之Compaction策略
MOR
表每次的upsert
等会新增一个delta log
文件,而compaction
就是将这些delte log
文件和基础文件进行合并的操作。
Hudi 默认提供了多种压缩策略,比如
- DayBasedCompactionStrategy
- 按最近分区进行选择性压缩的策略,适用于只有最近的分区会进行才会更新的场景,可以通过
hoodie.compaction.daybased.target.partitions
参数配置
- 按最近分区进行选择性压缩的策略,适用于只有最近的分区会进行才会更新的场景,可以通过
- BoundedPartitionAwareCompactionStrategy
- 和
DayBasedCompactionStrategy
类似,只不过该策略会使用当前日期减去hoodie.compaction.daybased.target.partitions
参数的值获取一个earliestPartitionPathToCompact
,然后比较所有分区的日期是否大于earliestPartitionPathToCompact
,大于的话则加入compaction
计划
- 和
- BoundedIOCompactionStrategy
- 该策略通过限制总的
read+write
的IO
大小来控制每次的压缩计划,可以通过hoodie.compaction.target.io
参数控制
- 该策略通过限制总的
- LogFileSizeBasedCompactionStrategy
- 该策略主要是根据
delta log
的大小进行压缩,并且为Hudi
当前版本(0.12.0)的默认压缩策略。该策略根据用户配置的hoodie.compaction.logfile.size.threshold
来过滤出符合条件的fileslice
,然后从大到小排序生成压缩计划。需要注意的是,该策略继承于BoundedIOCompactionStrategy
,所以该策略也会控制每次的压缩IO
- 该策略主要是根据
- UnBoundedCompactionStrategy
- 该策略与
BoundedIOCompactionStrategy
相对,不会进行任何IO的控制,容易OOM
,不建议使用。
- 该策略与
- UnBoundedPartitionAwareCompactionStrategy
- 该策略和
BoundedPartitionAwareCompactionStrategy
相对,生成的执行计划与之互补
- 该策略和
虽然 Hudi
帮我们配置了默认的压缩策略(LogFileSizeBasedCompactionStrategy
),但实际上我们可以根据不同的业务场景选择不同的压缩策略,此时可以获得最好的压缩性能。顺便提示一下,Hudi
的压缩策略可以通过 hoodie.compaction.strategy
配置。
基于文件数的压缩策略
在我这边的业务场景中,很早之前我就自定义了自己的压缩策略。具体压缩逻辑如下:
public class LogFileNumBasedCompactionStrategy extends BoundedIOCompactionStrategy
implements Comparator<HoodieCompactionOperation>
@Override
public List<HoodieCompactionOperation> orderAndFilter(HoodieWriteConfig writeConfig, List<HoodieCompactionOperation> operations, List<HoodieCompactionPlan> pendingCompactionPlans)
Long numThreshold = writeConfig.getCompactionLogFileNumThreshold();
List<HoodieCompactionOperation> filterOperator = operations.stream()
.filter(e -> e.getDeltaFilePaths().size() >= numThreshold)
.sorted(this).collect(Collectors.toList());
return super.orderAndFilter(writeConfig, filterOperator, pendingCompactionPlans);
@Override
public int compare(HoodieCompactionOperation hco1, HoodieCompactionOperation hco2)
return hco2.getDeltaFilePaths().size() - hco1.getDeltaFilePaths().size();
该压缩策略首先会获取用户配置的 hoodie.compaction.logfile.num.threshold
最小增量文件数,凡是大于等于该值的 fileslice
都会被选择,然后根据文件数的大小进行从大到小排序。并且该策略继承于 BoundedIOCompactionStrategy
,所以该策略同时也限制了压缩的最大 IO
。
需要注意是的,考虑到很多新手用户在启动 Hudi Job
一定时间内可能找不到压缩触发而疑惑,所以我配置了hoodie.compaction.logfile.num.threshold
默认值为 0
,使用该策略时,可以酌情增加该值的大小(过大会有小文件问题,影响读取的性能),比如配置为 3
,一个既不频繁触发compaction
,又能接受的小文件大小。
该策略我也已经贡献到了Hudi
社区,估计在 0.12.1
版本中可以使用,想提前使用的,可以跟踪下这个 PR
:https://github.com/apache/hudi/pull/6670
,在自己项目里新建一个压缩类就可以了。
以上是关于Apache Hudi一种基于增量日志文件数的压缩策略的主要内容,如果未能解决你的问题,请参考以下文章