Apache Hudi一种基于增量日志文件数的压缩策略

Posted scx_white

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Apache Hudi一种基于增量日志文件数的压缩策略相关的知识,希望对你有一定的参考价值。

前言

Hudi 的表格式分为 COW(copy on write)MOR(merge on read)COW 表每次的 upsert 操作都会根据不同的索引(bloom indexhbase indexbucket index 等等)查找到数据所在的基础数据文件,然后合并基础数据文件生成新的基础数据文件,有很明显的写放大的问题。为了提高 upsert的性能问题,Hudi 提供了一种新的表类型:MOR 表。该表在每次的 upsert 操作时,并不会实时的和基础数据文件进行合并,而是生成新的增量日志文件,而增量文件和基础文件的合并就叫做压缩(compaction)。所以当一个读操作读取 MOR 表时,此时会读取该表的基础数据文件和所有的增量日志文件,然后进行一个合并操作(比如根据主键聚合取更新时间最新的那条数据),最后将合并后的结果返回给终端。读时合并(merge on read)这也是为何叫 MOR 表的原因。

大家都知道,在 olap 引擎中,大量小文件对于查询的性能影响很大。首先是磁盘的多次寻址问题,其次是 mpp 架构的引擎对于表的读操作通常是根据文件来进行 task 个数的划分,大量小文件会导致一个 job 生成大量的 task 来读取数据,而每次 task 的启动、销毁也会占用大量时间。最终导致任务执行过慢的问题。所以通常在写入数据之前,我们都会进行小文件的合并,比如 SPARKrepartitioncoalesce。当然 Hudi 也有类似的操作,那就是compaction.

Hudi之Compaction策略

MOR 表每次的 upsert 等会新增一个 delta log 文件,而 compaction 就是将这些 delte log 文件和基础文件进行合并的操作。

Hudi 默认提供了多种压缩策略,比如

  • DayBasedCompactionStrategy
    • 按最近分区进行选择性压缩的策略,适用于只有最近的分区会进行才会更新的场景,可以通过 hoodie.compaction.daybased.target.partitions 参数配置
  • BoundedPartitionAwareCompactionStrategy
    • DayBasedCompactionStrategy 类似,只不过该策略会使用当前日期减去hoodie.compaction.daybased.target.partitions 参数的值获取一个 earliestPartitionPathToCompact,然后比较所有分区的日期是否大于 earliestPartitionPathToCompact,大于的话则加入 compaction 计划
  • BoundedIOCompactionStrategy
    • 该策略通过限制总的 read+writeIO 大小来控制每次的压缩计划,可以通过hoodie.compaction.target.io 参数控制
  • LogFileSizeBasedCompactionStrategy
    • 该策略主要是根据 delta log 的大小进行压缩,并且为 Hudi 当前版本(0.12.0)的默认压缩策略。该策略根据用户配置的 hoodie.compaction.logfile.size.threshold 来过滤出符合条件的 fileslice ,然后从大到小排序生成压缩计划。需要注意的是,该策略继承于 BoundedIOCompactionStrategy ,所以该策略也会控制每次的压缩 IO
  • UnBoundedCompactionStrategy
    • 该策略与 BoundedIOCompactionStrategy 相对,不会进行任何IO的控制,容易OOM,不建议使用。
  • UnBoundedPartitionAwareCompactionStrategy
    • 该策略和 BoundedPartitionAwareCompactionStrategy 相对,生成的执行计划与之互补

虽然 Hudi 帮我们配置了默认的压缩策略(LogFileSizeBasedCompactionStrategy),但实际上我们可以根据不同的业务场景选择不同的压缩策略,此时可以获得最好的压缩性能。顺便提示一下,Hudi 的压缩策略可以通过 hoodie.compaction.strategy 配置。

基于文件数的压缩策略

在我这边的业务场景中,很早之前我就自定义了自己的压缩策略。具体压缩逻辑如下:

public class LogFileNumBasedCompactionStrategy extends BoundedIOCompactionStrategy
    implements Comparator<HoodieCompactionOperation> 

  @Override
  public List<HoodieCompactionOperation> orderAndFilter(HoodieWriteConfig writeConfig, List<HoodieCompactionOperation> operations, List<HoodieCompactionPlan> pendingCompactionPlans) 
    Long numThreshold = writeConfig.getCompactionLogFileNumThreshold();
    List<HoodieCompactionOperation> filterOperator = operations.stream()
        .filter(e -> e.getDeltaFilePaths().size() >= numThreshold)
        .sorted(this).collect(Collectors.toList());
    return super.orderAndFilter(writeConfig, filterOperator, pendingCompactionPlans);
  

  @Override
  public int compare(HoodieCompactionOperation hco1, HoodieCompactionOperation hco2) 
    return hco2.getDeltaFilePaths().size() - hco1.getDeltaFilePaths().size();
  

该压缩策略首先会获取用户配置的 hoodie.compaction.logfile.num.threshold 最小增量文件数,凡是大于等于该值的 fileslice 都会被选择,然后根据文件数的大小进行从大到小排序。并且该策略继承于 BoundedIOCompactionStrategy,所以该策略同时也限制了压缩的最大 IO
需要注意是的,考虑到很多新手用户在启动 Hudi Job 一定时间内可能找不到压缩触发而疑惑,所以我配置了hoodie.compaction.logfile.num.threshold 默认值为 0,使用该策略时,可以酌情增加该值的大小(过大会有小文件问题,影响读取的性能),比如配置为 3,一个既不频繁触发compaction,又能接受的小文件大小。
该策略我也已经贡献到了Hudi 社区,估计在 0.12.1 版本中可以使用,想提前使用的,可以跟踪下这个 PRhttps://github.com/apache/hudi/pull/6670,在自己项目里新建一个压缩类就可以了。

以上是关于Apache Hudi一种基于增量日志文件数的压缩策略的主要内容,如果未能解决你的问题,请参考以下文章

Apache Hudi一种基于增量日志文件数的压缩策略

Apache Hudi一种基于增量日志文件数的压缩策略

底层基于Apache Hudi的DLA最佳实践 | 海量低成本日志分析

Apache Hudi 数据湖概述

Apache Hudi 数据湖概述

Apache Hudi 数据湖概述