iceberg 源码分析之 HadoopTableOperations

Posted PeersLee

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了iceberg 源码分析之 HadoopTableOperations相关的知识,希望对你有一定的参考价值。

HadoopTableOperations

public class HadoopTableOperations implements TableOperations {
  
  private volatile TableMetadata currentMetadata = null;
  private volatile Integer version = null;
  private volatile boolean shouldRefresh = true;

  protected HadoopTableOperations(Path location, FileIO fileIO, Configuration conf {
    this.conf = conf;
    this.location = location;
    this.fileIO = fileIO;
  }
}

在写数据到 iceberg 后会调用 commit() 方法提交 metadata 并更新 shouldRefresh 为 true, 与此同时有读操作在调用 currnet() 方法时则会根据 shouldRefresh 标记更新 version 和 currentMetadata 保证读最新.

org.apache.iceberg.hadoop.HadoopTableOperations#current

  1. shouldRefresh 默认 true
  2. 调用 refresh 方法
  @Override
  public TableMetadata current() {
    LOG.info("org.apache.iceberg.hadoop.HadoopTableOperations.current");
    if (this.currentMetadata != null) {
      LOG.info("[shouldRefresh='{}', currentMetadata='{}']", this.shouldRefresh,
          this.currentMetadata.location() + "::" + this.currentMetadata.metadataFileLocation());
    }
    if (shouldRefresh) {
      return refresh();
    }
    return currentMetadata;
  }

org.apache.iceberg.hadoop.HadoopTableOperations#refresh

  1. 先调用 findVersion 根据 metadataRoot/version-hint.text 读到 ver 值
  2. 自旋获得 metadataFile 路径: metadataRoot/v_$ver_$codec_metadata.json
  3. 调用 updateVersionAndMetadata 更新元数据: version/ currentMetadata/
  @Override
  public TableMetadata refresh() {
    LOG.info("org.apache.iceberg.hadoop.HadoopTableOperations.refresh.");
    int ver = version != null ? version : findVersion();
    LOG.info("[ver='{}']", ver);
    try {
      Path metadataFile = getMetadataFile(ver);
      if (version == null && metadataFile == null && ver == 0) {
        // no v0 metadata means the table doesn't exist yet
        return null;
      } else if (metadataFile == null) {
        throw new ValidationException("Metadata file for version %d is missing", ver);
      }

      // spin: nextMetadataFile == null
      // 在 commit 之后 ver 自增
      // 此处 spin 来保障读最新的 ver
      Path nextMetadataFile = getMetadataFile(ver + 1);
      while (nextMetadataFile != null) {
        ver += 1;
        metadataFile = nextMetadataFile;
        nextMetadataFile = getMetadataFile(ver + 1);
      }

      // 线程安全
      updateVersionAndMetadata(ver, metadataFile.toString());

      this.shouldRefresh = false;
      return currentMetadata;
    } catch (IOException e) {
      throw new RuntimeIOException(e, "Failed to refresh the table");
    }
  }

 

org.apache.iceberg.hadoop.HadoopTableOperations#commit

  1. base metadata 是当前的元数据版本; temp metadata 是马上要写元数据;
  2. 写 temp metadata.json 之后 rename 成 $ver+1 版本的最终版元数据
  3. 写 version-hint.text 文件
  4. 若有开关则删除 old 版本的 metadata.json 文件
  @Override
  public void commit(TableMetadata base, TableMetadata metadata) {
    LOG.info("start commit.");
    // 此处有个 caffeine 本地 cache
    Pair<Integer, TableMetadata> current = versionAndMetadata();
    if (base != current.second()) {
      throw new CommitFailedException("Cannot commit changes based on stale table metadata");
    }

    if (base == metadata) {
      LOG.info("Nothing to commit.");
      return;
    }

    Preconditions.checkArgument(base == null || base.location().equals(metadata.location()),
        "Hadoop path-based tables cannot be relocated");
    Preconditions.checkArgument(
        !metadata.properties().containsKey(TableProperties.WRITE_METADATA_LOCATION),
        "Hadoop path-based tables cannot relocate metadata");

    String codecName = metadata.property(
        TableProperties.METADATA_COMPRESSION, TableProperties.METADATA_COMPRESSION_DEFAULT);
    TableMetadataParser.Codec codec = TableMetadataParser.Codec.fromName(codecName);
    String fileExtension = TableMetadataParser.getFileExtension(codec);
    Path tempMetadataFile = metadataPath(UUID.randomUUID().toString() + fileExtension);
    LOG.info("[base='{}', metadata='{}', tempMetadataFile='{}']", base.metadataFileLocation(),
        metadata.metadataFileLocation(), tempMetadataFile);
    TableMetadataParser.write(metadata, io().newOutputFile(tempMetadataFile.toString()));
    LOG.info("TableMetadataParser.write(metadata, io().newOutputFile(tempMetadataFile.toString()));");

    int nextVersion = (current.first() != null ? current.first() : 0) + 1;
    Path finalMetadataFile = metadataFilePath(nextVersion, codec);
    LOG.info("nextVer='{}', finalMetaFile='{}'", nextVersion, finalMetadataFile);
    FileSystem fs = getFileSystem(tempMetadataFile, conf);

    try {
      if (fs.exists(finalMetadataFile)) {
        throw new CommitFailedException(
            "Version %d already exists: %s", nextVersion, finalMetadataFile);
      }
    } catch (IOException e) {
      throw new RuntimeIOException(e,
          "Failed to check if next version exists: %s", finalMetadataFile);
    }

    // this rename operation is the atomic commit operation
    LOG.info("start renameToFinal.");
    renameToFinal(fs, tempMetadataFile, finalMetadataFile);

    LOG.info("start writeVersionHint");
    // update the best-effort version pointer
    writeVersionHint(nextVersion);

    LOG.info("start deleteRemovedMetadataFiles");
    // 开关:
    // org.apache.iceberg.TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED
    deleteRemovedMetadataFiles(base, metadata);

    this.shouldRefresh = true;
    LOG.info("end commit.");
  }

 

以上是关于iceberg 源码分析之 HadoopTableOperations的主要内容,如果未能解决你的问题,请参考以下文章

iceberg 源码分析之 HadoopTableOperations

Apache Iceberg入门教程系列之小文件合并

Flink Iceberg Source 并行度推断源码解析

Flink Iceberg Source 并行度推断源码解析

Flink读取Iceberg表的实现源码解读

Flink读取Iceberg表的实现源码解读