iceberg flink 写操作

Posted PeersLee

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了iceberg flink 写操作相关的知识,希望对你有一定的参考价值。

org.apache.iceberg.io.PartitionedFanoutWriter#write

public void write(T row) throws IOException {
    // org.apache.flink.table.data.RowData -> org.apache.iceberg.PartitionKey
    PartitionKey partitionKey = partition(row);
    // org.apache.iceberg.io.BaseTaskWriter$RollingFileWriter
    RollingFileWriter writer = writers.get(partitionKey);
    if (writer == null) {
        // NOTICE: we need to copy a new partition key here, in case of messing up the keys in writers.
        PartitionKey copiedKey = partitionKey.copy();
        writer = new RollingFileWriter(copiedKey);
        writers.put(copiedKey, writer);
    }
    writer.write(row);
}

org.apache.iceberg.io.BaseTaskWriter.BaseRollingWriter#write(T)

public void write(T record) throws IOException {
    // org.apache.iceberg.io.BaseTaskWriter.RollingFileWriter
    write(currentWriter, record);
    this.currentRows++;
    // currentRows % ROWS_DIVISOR == 0
    if (shouldRollToNewFile()) {
        closeCurrent();
        openCurrent();
    }
}

org.apache.iceberg.io.BaseTaskWriter.RollingFileWriter#write

@Override
void write(DataWriter<T> writer, T record) {
    // org.apache.iceberg.io.DataWriter
    writer.add(record);
}

org.apache.iceberg.io.DataWriter#add

public void add(T row) {
    // org.apache.iceberg.parquet.ParquetWriter
    appender.add(row);
}

org.apache.iceberg.parquet.ParquetWriter#add

@Override
public void add(T value) {
    recordCount += 1;
    // org.apache.iceberg.flink.data.FlinkParquetWriters$RowDataWriter
    model.write(0, value);
    // org.apache.parquet.column.impl.ColumnWriteStoreV1
    writeStore.endRecord();
    checkSize();
}

org.apache.iceberg.io.BaseTaskWriter.BaseRollingWriter#close

@Override
public void close() throws IOException {
    closeCurrent();
}

org.apache.iceberg.io.BaseTaskWriter.BaseRollingWriter#closeCurrent

private void closeCurrent() throws IOException {
    if (currentWriter != null) {
        // org.apache.iceberg.io.DataWriter
        currentWriter.close();

        if (currentRows == 0L) {
            io.deleteFile(currentFile.encryptingOutputFile());
        } else {
            complete(currentWriter);
        }

        this.currentFile = null;
        this.currentWriter = null;
        this.currentRows = 0;
    }
}

 

以上是关于iceberg flink 写操作的主要内容,如果未能解决你的问题,请参考以下文章

iceberg flink 写操作

iceberg flink 写操作

数据湖(十七):Flink与Iceberg整合DataStream API操作

数据湖(十七):Flink与Iceberg整合DataStream API操作

iceberg flink 读操作

iceberg flink 读操作