iceberg flink 写操作
Posted PeersLee
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了iceberg flink 写操作相关的知识,希望对你有一定的参考价值。
org.apache.iceberg.io.PartitionedFanoutWriter#write
public void write(T row) throws IOException {
// org.apache.flink.table.data.RowData -> org.apache.iceberg.PartitionKey
PartitionKey partitionKey = partition(row);
// org.apache.iceberg.io.BaseTaskWriter$RollingFileWriter
RollingFileWriter writer = writers.get(partitionKey);
if (writer == null) {
// NOTICE: we need to copy a new partition key here, in case of messing up the keys in writers.
PartitionKey copiedKey = partitionKey.copy();
writer = new RollingFileWriter(copiedKey);
writers.put(copiedKey, writer);
}
writer.write(row);
}
org.apache.iceberg.io.BaseTaskWriter.BaseRollingWriter#write(T)
public void write(T record) throws IOException {
// org.apache.iceberg.io.BaseTaskWriter.RollingFileWriter
write(currentWriter, record);
this.currentRows++;
// currentRows % ROWS_DIVISOR == 0
if (shouldRollToNewFile()) {
closeCurrent();
openCurrent();
}
}
org.apache.iceberg.io.BaseTaskWriter.RollingFileWriter#write
@Override
void write(DataWriter<T> writer, T record) {
// org.apache.iceberg.io.DataWriter
writer.add(record);
}
org.apache.iceberg.io.DataWriter#add
public void add(T row) {
// org.apache.iceberg.parquet.ParquetWriter
appender.add(row);
}
org.apache.iceberg.parquet.ParquetWriter#add
@Override
public void add(T value) {
recordCount += 1;
// org.apache.iceberg.flink.data.FlinkParquetWriters$RowDataWriter
model.write(0, value);
// org.apache.parquet.column.impl.ColumnWriteStoreV1
writeStore.endRecord();
checkSize();
}
org.apache.iceberg.io.BaseTaskWriter.BaseRollingWriter#close
@Override
public void close() throws IOException {
closeCurrent();
}
org.apache.iceberg.io.BaseTaskWriter.BaseRollingWriter#closeCurrent
private void closeCurrent() throws IOException {
if (currentWriter != null) {
// org.apache.iceberg.io.DataWriter
currentWriter.close();
if (currentRows == 0L) {
io.deleteFile(currentFile.encryptingOutputFile());
} else {
complete(currentWriter);
}
this.currentFile = null;
this.currentWriter = null;
this.currentRows = 0;
}
}
以上是关于iceberg flink 写操作的主要内容,如果未能解决你的问题,请参考以下文章
数据湖(十七):Flink与Iceberg整合DataStream API操作