iceberg 源码分析之 HadoopTableOperations
Posted PeersLee
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了iceberg 源码分析之 HadoopTableOperations相关的知识,希望对你有一定的参考价值。
HadoopTableOperations
public class HadoopTableOperations implements TableOperations {
private volatile TableMetadata currentMetadata = null;
private volatile Integer version = null;
private volatile boolean shouldRefresh = true;
protected HadoopTableOperations(Path location, FileIO fileIO, Configuration conf {
this.conf = conf;
this.location = location;
this.fileIO = fileIO;
}
}
在写数据到 iceberg 后会调用 commit() 方法提交 metadata 并更新 shouldRefresh 为 true, 与此同时有读操作在调用 currnet() 方法时则会根据 shouldRefresh 标记更新 version 和 currentMetadata 保证读最新.
org.apache.iceberg.hadoop.HadoopTableOperations#current
- shouldRefresh 默认 true
- 调用 refresh 方法
@Override
public TableMetadata current() {
LOG.info("org.apache.iceberg.hadoop.HadoopTableOperations.current");
if (this.currentMetadata != null) {
LOG.info("[shouldRefresh='{}', currentMetadata='{}']", this.shouldRefresh,
this.currentMetadata.location() + "::" + this.currentMetadata.metadataFileLocation());
}
if (shouldRefresh) {
return refresh();
}
return currentMetadata;
}
org.apache.iceberg.hadoop.HadoopTableOperations#refresh
- 先调用 findVersion 根据 metadataRoot/version-hint.text 读到 ver 值
- 自旋获得 metadataFile 路径: metadataRoot/v_$ver_$codec_metadata.json
- 调用 updateVersionAndMetadata 更新元数据: version/ currentMetadata/
@Override
public TableMetadata refresh() {
LOG.info("org.apache.iceberg.hadoop.HadoopTableOperations.refresh.");
int ver = version != null ? version : findVersion();
LOG.info("[ver='{}']", ver);
try {
Path metadataFile = getMetadataFile(ver);
if (version == null && metadataFile == null && ver == 0) {
// no v0 metadata means the table doesn't exist yet
return null;
} else if (metadataFile == null) {
throw new ValidationException("Metadata file for version %d is missing", ver);
}
// spin: nextMetadataFile == null
// 在 commit 之后 ver 自增
// 此处 spin 来保障读最新的 ver
Path nextMetadataFile = getMetadataFile(ver + 1);
while (nextMetadataFile != null) {
ver += 1;
metadataFile = nextMetadataFile;
nextMetadataFile = getMetadataFile(ver + 1);
}
// 线程安全
updateVersionAndMetadata(ver, metadataFile.toString());
this.shouldRefresh = false;
return currentMetadata;
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to refresh the table");
}
}
org.apache.iceberg.hadoop.HadoopTableOperations#commit
- base metadata 是当前的元数据版本; temp metadata 是马上要写元数据;
- 写 temp metadata.json 之后 rename 成 $ver+1 版本的最终版元数据
- 写 version-hint.text 文件
- 若有开关则删除 old 版本的 metadata.json 文件
@Override
public void commit(TableMetadata base, TableMetadata metadata) {
LOG.info("start commit.");
// 此处有个 caffeine 本地 cache
Pair<Integer, TableMetadata> current = versionAndMetadata();
if (base != current.second()) {
throw new CommitFailedException("Cannot commit changes based on stale table metadata");
}
if (base == metadata) {
LOG.info("Nothing to commit.");
return;
}
Preconditions.checkArgument(base == null || base.location().equals(metadata.location()),
"Hadoop path-based tables cannot be relocated");
Preconditions.checkArgument(
!metadata.properties().containsKey(TableProperties.WRITE_METADATA_LOCATION),
"Hadoop path-based tables cannot relocate metadata");
String codecName = metadata.property(
TableProperties.METADATA_COMPRESSION, TableProperties.METADATA_COMPRESSION_DEFAULT);
TableMetadataParser.Codec codec = TableMetadataParser.Codec.fromName(codecName);
String fileExtension = TableMetadataParser.getFileExtension(codec);
Path tempMetadataFile = metadataPath(UUID.randomUUID().toString() + fileExtension);
LOG.info("[base='{}', metadata='{}', tempMetadataFile='{}']", base.metadataFileLocation(),
metadata.metadataFileLocation(), tempMetadataFile);
TableMetadataParser.write(metadata, io().newOutputFile(tempMetadataFile.toString()));
LOG.info("TableMetadataParser.write(metadata, io().newOutputFile(tempMetadataFile.toString()));");
int nextVersion = (current.first() != null ? current.first() : 0) + 1;
Path finalMetadataFile = metadataFilePath(nextVersion, codec);
LOG.info("nextVer='{}', finalMetaFile='{}'", nextVersion, finalMetadataFile);
FileSystem fs = getFileSystem(tempMetadataFile, conf);
try {
if (fs.exists(finalMetadataFile)) {
throw new CommitFailedException(
"Version %d already exists: %s", nextVersion, finalMetadataFile);
}
} catch (IOException e) {
throw new RuntimeIOException(e,
"Failed to check if next version exists: %s", finalMetadataFile);
}
// this rename operation is the atomic commit operation
LOG.info("start renameToFinal.");
renameToFinal(fs, tempMetadataFile, finalMetadataFile);
LOG.info("start writeVersionHint");
// update the best-effort version pointer
writeVersionHint(nextVersion);
LOG.info("start deleteRemovedMetadataFiles");
// 开关:
// org.apache.iceberg.TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED
deleteRemovedMetadataFiles(base, metadata);
this.shouldRefresh = true;
LOG.info("end commit.");
}
以上是关于iceberg 源码分析之 HadoopTableOperations的主要内容,如果未能解决你的问题,请参考以下文章
iceberg 源码分析之 HadoopTableOperations
Flink Iceberg Source 并行度推断源码解析