iceberg flink 读操作

Posted PeersLee

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了iceberg flink 读操作相关的知识,希望对你有一定的参考价值。

org.apache.iceberg.flink.source.FlinkInputFormat.open

@Override
public void open(FlinkInputSplit split) {
    // split.getTask().files(): FlinkInputSplit.CombinedScanTask.Collection<FileScanTask>: SplitScanTask
    // encryptionClass: org.apache.iceberg.encryption.PlaintextEncryptionManager
    // tableSchema & context.project(): org.apache.iceberg.Schema
    // caseSensitive: false
    this.iterator = new RowDataIterator(
            split.getTask(), io, encryption, tableSchema, context.project(), context.nameMapping(),
            context.caseSensitive());
}

org.apache.iceberg.flink.source.DataIterator.DataIterator

DataIterator(CombinedScanTask task, FileIO io, EncryptionManager encryption) {
    this.tasks = task.files().iterator();

    Map<String, ByteBuffer> keyMetadata = Maps.newHashMap();
    task.files().stream()
        .flatMap(fileScanTask -> Stream.concat(Stream.of(fileScanTask.file()), fileScanTask.deletes().stream()))
        .forEach(file -> keyMetadata.put(file.path().toString(), file.keyMetadata()));
    Stream<EncryptedInputFile> encrypted = keyMetadata.entrySet().stream()
        .map(entry -> EncryptedFiles.encryptedInput(io.newInputFile(entry.getKey()), entry.getValue()));

    // decrypt with the batch call to avoid multiple RPCs to a key server, if possible
    Iterable<InputFile> decryptedFiles = encryption.decrypt(encrypted::iterator);

    Map<String, InputFile> files = Maps.newHashMapWithExpectedSize(task.files().size());
    decryptedFiles.forEach(decrypted -> files.putIfAbsent(decrypted.location(), decrypted));

    // 该 task 进程的所有需要 scan 的 parq 文件路径
    this.inputFiles = Collections.unmodifiableMap(files);

    this.currentIterator = CloseableIterator.empty();
}

org.apache.iceberg.hadoop.HadoopFileIO#newInputFile {...}

org.apache.iceberg.flink.source.RowDataIterator.RowDataIterator

RowDataIterator(CombinedScanTask task, FileIO io, EncryptionManager encryption, Schema tableSchema,
        Schema projectedSchema, String nameMapping, boolean caseSensitive) {
    super(task, io, encryption);
    // tableSchema & projectedSchema: org.apache.iceberg.Schema
    this.tableSchema = tableSchema;
    this.projectedSchema = projectedSchema;
    this.nameMapping = nameMapping;
    // false
    this.caseSensitive = caseSensitive;
}

org.apache.iceberg.flink.source.DataIterator.hasNext

@Override
public boolean hasNext() {
    updateCurrentIterator();
    return currentIterator.hasNext();
}

org.apache.iceberg.flink.source.DataIterator.updateCurrentIterator

private void updateCurrentIterator() {
    try {
        while (!currentIterator.hasNext() && tasks.hasNext()) {
            currentIterator.close();
            currentIterator = openTaskIterator(tasks.next());
        }
    } catch (IOException e) {
        throw new UncheckedIOException(e);
    }
}

org.apache.iceberg.flink.source.RowDataIterator.openTaskIterator

org.apache.iceberg.flink.source.RowDataIterator#newIterable

org.apache.iceberg.flink.source.RowDataIterator#newParquetIterable

org.apache.iceberg.parquet.ParquetReader.iterator

@Override
public CloseableIterator<T> iterator() {
    FileIterator<T> iter = new FileIterator<>(init());
    addCloseable(iter);
    return iter;
}

org.apache.iceberg.flink.source.DataIterator.next

@Override
public T next() {
    updateCurrentIterator();
    return currentIterator.next();
}

org.apache.iceberg.flink.source.DataIterator.updateCurrentIterator

private void updateCurrentIterator() {
    try {
        while (!currentIterator.hasNext() && tasks.hasNext()) {
            currentIterator.close();
            currentIterator = openTaskIterator(tasks.next());
        }
    } catch (IOException e) {
        throw new UncheckedIOException(e);
    }
}

org.apache.iceberg.parquet.ParquetReader.FileIterator.next

@Override
public T next() {
    // valuesRead: 0
    // nextRowGroupStart: 0
    if (valuesRead >= nextRowGroupStart) {
        advance();
    }

    if (reuseContainers) {
        this.last = model.read(last);
    } else {
        this.last = model.read(null);
    }
    valuesRead += 1;

    return last;
}

org.apache.iceberg.parquet.ParquetValueReaders.StructReader.read

@Override
public final T read(T reuse) {
    I intermediate = newStructData(reuse);

    for (int i = 0; i < readers.length; i += 1) {
        set(intermediate, i, readers[i].read(get(intermediate, i)));
    }

    return buildStruct(intermediate);
}

org.apache.iceberg.flink.data.FlinkParquetReaders.StringReader.read

@Override
public StringData read(StringData ignored) {
    Binary binary = column.nextBinary();
    ByteBuffer buffer = binary.toByteBuffer();
    if (buffer.hasArray()) {
        return StringData.fromBytes(
                buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining());
    } else {
        return StringData.fromBytes(binary.getBytes());
    }
}

 

以上是关于iceberg flink 读操作的主要内容,如果未能解决你的问题,请参考以下文章

iceberg flink 读操作

iceberg flink 读操作

数据湖(十七):Flink与Iceberg整合DataStream API操作

数据湖(十七):Flink与Iceberg整合DataStream API操作

iceberg flink 写操作

iceberg flink 写操作