iceberg flink 读操作

Posted PeersLee

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了iceberg flink 读操作相关的知识,希望对你有一定的参考价值。

org.apache.iceberg.flink.source.FlinkInputFormat.open

@Override
public void open(FlinkInputSplit split) 
    // split.getTask().files(): FlinkInputSplit.CombinedScanTask.Collection<FileScanTask>: SplitScanTask
    // encryptionClass: org.apache.iceberg.encryption.PlaintextEncryptionManager
    // tableSchema & context.project(): org.apache.iceberg.Schema
    // caseSensitive: false
    this.iterator = new RowDataIterator(
            split.getTask(), io, encryption, tableSchema, context.project(), context.nameMapping(),
            context.caseSensitive());

org.apache.iceberg.flink.source.DataIterator.DataIterator

DataIterator(CombinedScanTask task, FileIO io, EncryptionManager encryption) 
    this.tasks = task.files().iterator();

    Map<String, ByteBuffer> keyMetadata = Maps.newHashMap();
    task.files().stream()
        .flatMap(fileScanTask -> Stream.concat(Stream.of(fileScanTask.file()), fileScanTask.deletes().stream()))
        .forEach(file -> keyMetadata.put(file.path().toString(), file.keyMetadata()));
    Stream<EncryptedInputFile> encrypted = keyMetadata.entrySet().stream()
        .map(entry -> EncryptedFiles.encryptedInput(io.newInputFile(entry.getKey()), entry.getValue()));

    // decrypt with the batch call to avoid multiple RPCs to a key server, if possible
    Iterable<InputFile> decryptedFiles = encryption.decrypt(encrypted::iterator);

    Map<String, InputFile> files = Maps.newHashMapWithExpectedSize(task.files().size());
    decryptedFiles.forEach(decrypted -> files.putIfAbsent(decrypted.location(), decrypted));

    // 该 task 进程的所有需要 scan 的 parq 文件路径
    this.inputFiles = Collections.unmodifiableMap(files);

    this.currentIterator = CloseableIterator.empty();

org.apache.iceberg.hadoop.HadoopFileIO#newInputFile ...

org.apache.iceberg.flink.source.RowDataIterator.RowDataIterator

RowDataIterator(CombinedScanTask task, FileIO io, EncryptionManager encryption, Schema tableSchema,
        Schema projectedSchema, String nameMapping, boolean caseSensitive) 
    super(task, io, encryption);
    // tableSchema & projectedSchema: org.apache.iceberg.Schema
    this.tableSchema = tableSchema;
    this.projectedSchema = projectedSchema;
    this.nameMapping = nameMapping;
    // false
    this.caseSensitive = caseSensitive;

org.apache.iceberg.flink.source.DataIterator.hasNext

@Override
public boolean hasNext() 
    updateCurrentIterator();
    return currentIterator.hasNext();

org.apache.iceberg.flink.source.DataIterator.updateCurrentIterator

private void updateCurrentIterator() 
    try 
        while (!currentIterator.hasNext() && tasks.hasNext()) 
            currentIterator.close();
            currentIterator = openTaskIterator(tasks.next());
        
     catch (IOException e) 
        throw new UncheckedIOException(e);
    

org.apache.iceberg.flink.source.RowDataIterator.openTaskIterator

org.apache.iceberg.flink.source.RowDataIterator#newIterable

org.apache.iceberg.flink.source.RowDataIterator#newParquetIterable

org.apache.iceberg.parquet.ParquetReader.iterator

@Override
public CloseableIterator<T> iterator() 
    FileIterator<T> iter = new FileIterator<>(init());
    addCloseable(iter);
    return iter;

org.apache.iceberg.flink.source.DataIterator.next

@Override
public T next() 
    updateCurrentIterator();
    return currentIterator.next();

org.apache.iceberg.flink.source.DataIterator.updateCurrentIterator

private void updateCurrentIterator() 
    try 
        while (!currentIterator.hasNext() && tasks.hasNext()) 
            currentIterator.close();
            currentIterator = openTaskIterator(tasks.next());
        
     catch (IOException e) 
        throw new UncheckedIOException(e);
    

org.apache.iceberg.parquet.ParquetReader.FileIterator.next

@Override
public T next() 
    // valuesRead: 0
    // nextRowGroupStart: 0
    if (valuesRead >= nextRowGroupStart) 
        advance();
    

    if (reuseContainers) 
        this.last = model.read(last);
     else 
        this.last = model.read(null);
    
    valuesRead += 1;

    return last;

org.apache.iceberg.parquet.ParquetValueReaders.StructReader.read

@Override
public final T read(T reuse) 
    I intermediate = newStructData(reuse);

    for (int i = 0; i < readers.length; i += 1) 
        set(intermediate, i, readers[i].read(get(intermediate, i)));
    

    return buildStruct(intermediate);

org.apache.iceberg.flink.data.FlinkParquetReaders.StringReader.read

@Override
public StringData read(StringData ignored) 
    Binary binary = column.nextBinary();
    ByteBuffer buffer = binary.toByteBuffer();
    if (buffer.hasArray()) 
        return StringData.fromBytes(
                buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining());
     else 
        return StringData.fromBytes(binary.getBytes());
    

 

以上是关于iceberg flink 读操作的主要内容,如果未能解决你的问题,请参考以下文章

iceberg flink 读操作

iceberg flink 读操作

数据湖(十七):Flink与Iceberg整合DataStream API操作

iceberg flink 写操作

iceberg flink 写操作

iceberg flink 写操作