Flink读取Iceberg表的实现源码解读

Posted 咬定青松

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink读取Iceberg表的实现源码解读相关的知识,希望对你有一定的参考价值。

本文首发微信公众号:码上观世界

任何存储系统都是由数据和元数据组成,Hive也不例外,只是它的数据存储系统和元数据存储是分离的,分别位于不同的分布式系统中,如数据存储在分布式系统MinIO或者HDFS上,元数据存储在HIve Metastore或第三方元数据系统如Glue等。

外部系统访问存储系统需要两步:先从元数据系统中获取要访问数据的元数据,如所在的位置等信息,然后再通过元数据访问存储系统,访问实际的数据。

Flink查询Iceberg表,可以使用如下的API:

HiveConf hiveConf=new HiveConf();
hiveConf.set("hive.metastore.uris", hmsUri);
hiveConf.set("hive.metastore.warehouse.dir", "s3a://test/");
...
Map<String, String> properties = new HashMap<>();
CatalogLoader catalogLoader=CatalogLoader.hive("hive", hiveConf, properties);
FlinkCatalog flinkCatalog=new FlinkCatalog("test_catalog_name","default", Namespace.empty(),catalogLoader,false);
flinkCatalog.open();
tableEnvironment.registerCatalog(flinkCatalog.getName(),flinkCatalog);
tableEnvironment.useCatalog(flinkCatalog.getName());
tableEnvironment.executeSql("select * from test_iceberg_table");

在FlinkCatalog中,动态加载HiveCatalog,并调用其initialize方法,完成初始化:

public class HiveCatalog extends BaseMetastoreCatalog implements SupportsNamespaces, Configurable 
  public static final String LIST_ALL_TABLES = "list-all-tables";
  public static final String LIST_ALL_TABLES_DEFAULT = "false";


  private static final Logger LOG = LoggerFactory.getLogger(HiveCatalog.class);


  private String name;
  private Configuration conf;
  private FileIO fileIO;
  private ClientPool<IMetaStoreClient, TException> clients;
  private boolean listAllTables = false;
  
  @Override
  public void initialize(String inputName, Map<String, String> properties) 
      this.name = inputName;
      if (conf == null) 
        LOG.warn("No Hadoop Configuration was set, using the default environment Configuration");
        this.conf = new Configuration();
      
    
      if (properties.containsKey(CatalogProperties.URI)) 
        this.conf.set(HiveConf.ConfVars.METASTOREURIS.varname, properties.get(CatalogProperties.URI));
      
    
      if (properties.containsKey(CatalogProperties.WAREHOUSE_LOCATION)) 
        this.conf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, properties.get(CatalogProperties.WAREHOUSE_LOCATION));
      
    
      this.listAllTables = Boolean.parseBoolean(properties.getOrDefault(LIST_ALL_TABLES, LIST_ALL_TABLES_DEFAULT));
    
      String fileIOImpl = properties.get(CatalogProperties.FILE_IO_IMPL);
      this.fileIO = fileIOImpl == null ? new HadoopFileIO(conf) : CatalogUtil.loadFileIO(fileIOImpl, properties, conf);
    
      this.clients = new CachedClientPool(conf, properties);

后续FlinkCatalog操作Iceberg元数据基本都代理到HiveCatalog执行,比如:

@Override
public CatalogTable getTable(ObjectPath tablePath) throws TableNotExistException, CatalogException 
  Table table = loadIcebergTable(tablePath);
  return toCatalogTable(table);

private Table loadIcebergTable(ObjectPath tablePath) throws TableNotExistException 
    Table table = icebergCatalog.loadTable(toIdentifier(tablePath));
    if (cacheEnabled) 
      table.refresh();
    
    return table;

代码中Table是Iceberg定义的表对象,其可以转成Flink定义的表对象。Table有一个重要的成员变量:HiveTableOperations,它包含了以下信息:

  • 当前table的完成标识(fullname和databaseName、tableName);

  • 当前元数据(currentmetadata);

  • 当前元数据位置(currentmetadataLocation);

  • HadoopFileIO,对表数据文件的读写则通过FileIO实现,Iceberg中的默认实现是HadoopFileIO;

  • HiveClient;

  • Hadoop Configuration对象,用于保存Hadoop相关的配置信息

下图是一个BaseTable对象示例的成员变量组成:

HiveTableOperations定义了表操作的基本接口,有了这个对象,使通过Table对象操作数据和元数据成为可能。

FlinkSource是Iceberg提供的用于读取Iceberg数据的封装类。另一个用于用于写Iceberg的封装类是FlinkSink。FlinkSource的关键方法是build:

//org.apache.iceberg.flink.source.FlinkSource.class
public DataStream<RowData> build() 
    Preconditions.checkNotNull(this.env, "StreamExecutionEnvironment should not be null");
    FlinkInputFormat format = this.buildFormat();
    ScanContext context = this.contextBuilder.build();
    TypeInformation<RowData> typeInfo = FlinkCompatibilityUtil.toTypeInfo(FlinkSchemaUtil.convert(context.project()));
    if (!context.isStreaming()) 
        int parallelism = this.inferParallelism(format, context);
        return this.env.createInput(format, typeInfo).setParallelism(parallelism);
     else 
        StreamingMonitorFunction function = new StreamingMonitorFunction(this.tableLoader, context);
        String monitorFunctionName = String.format("Iceberg table (%s) monitor", this.table);
        String readerOperatorName = String.format("Iceberg table (%s) reader", this.table);
        return this.env.addSource(function, monitorFunctionName).transform(readerOperatorName, typeInfo, StreamingReaderOperator.factory(format));
    

其中重要的类实现是FlinkInputFormat,其提供了跟Flink集成时,操作表的元数据和数据的方式:

//org.apache.iceberg.flink.source.FlinkInputFormat
FlinkInputFormat(TableLoader tableLoader, Schema tableSchema, FileIO io, EncryptionManager encryption, ScanContext context) 
    this.tableLoader = tableLoader;
    this.io = io;
    this.encryption = encryption;
    this.context = context;
    this.rowDataReader = new RowDataFileScanTaskReader(tableSchema, context.project(), context.nameMapping(), context.caseSensitive());

public FlinkInputSplit[] createInputSplits(int minNumSplits) throws IOException 
    this.tableLoader.open();
    TableLoader loader = this.tableLoader;
    FlinkInputSplit[] inputSplits;
    Table table = loader.loadTable();
    inputSplits = FlinkSplitPlanner.planInputSplits(table, this.context);  
    return inputSplits;  



public void open(FlinkInputSplit split) 
    this.iterator = new DataIterator(this.rowDataReader, split.getTask(), this.io, this.encryption);



public RowData nextRecord(RowData reuse) 
    ++this.currentReadCount;
    return (RowData)this.iterator.next();

FlinkInputFormat有两个重要的成员变量:TableLoader tableLoader和FileIO io。通过它们,可以对元数据和数据进行操作。流任务(StreamTask)启动时候执行InputFormatSourceFunction的open方法完成初始化,然后就通过FlinkInputFormat循环读取数据记录:

//org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.class
public void open(Configuration parameters) throws Exception 
    StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext();


    if (format instanceof RichInputFormat) 
        ((RichInputFormat) format).setRuntimeContext(context);
    
    format.configure(parameters);


    provider = context.getInputSplitProvider();
    serializer = typeInfo.createSerializer(getRuntimeContext().getExecutionConfig());
    splitIterator = getInputSplits();
    isRunning = splitIterator.hasNext();

@Override
public void run(SourceContext<OUT> ctx) throws Exception 
    ...
    OUT nextElement = serializer.createInstance();
    while (isRunning) 
        format.open(splitIterator.next());
    
        // for each element we also check if cancel
        // was called by checking the isRunning flag
    
        while (isRunning && !format.reachedEnd()) 
            nextElement = format.nextRecord(nextElement);
            if (nextElement != null) 
                ctx.collect(nextElement);
             else 
                break;
            
        
        format.close();
    
        if (isRunning) 
            isRunning = splitIterator.hasNext();
        
    
    ...
  

但是要获取记录级别的数据,需要经过两大步骤:先获取FlinkInputSplit 迭代器,然后获取FileScanTask迭代器,并从中读取记录。

前者对应代码中getInputSplits方法,其通过RPC方法调用获取读取文件FlinkInputSplit的迭代器,实际最终仍然是调用FlinkInputFormat的createInputSplits,在createInputSplits方法中,通过FlinkSplitPlanner返回FlinkInputSplit迭代器:

//org.apache.iceberg.flink.source.FlinkSplitPlanner.class


static FlinkInputSplit[] planInputSplits(Table table, ScanContext context) 
    CloseableIterable<CombinedScanTask> tasksIterable = planTasks(table, context); 
    List<CombinedScanTask> tasks = Lists.newArrayList(tasksIterable);
    FlinkInputSplit[] splits = new FlinkInputSplit[tasks.size()];
    
    for(int i = 0; i < tasks.size(); ++i) 
        splits[i] = new FlinkInputSplit(i, (CombinedScanTask)tasks.get(i));
    
    return splits;

        
static CloseableIterable<CombinedScanTask> planTasks(Table table, ScanContext context) 
    TableScan scan = table.newScan().caseSensitive(context.caseSensitive()).project(context.project());
    //apply scan options
    ...
    return scan.planTasks();

后者通过format.nextRecord读取记录,内部使用了DataIterator,在format.open初始化DataIterator,在DataIterator完成对FileScanTask级别的迭代读访问:

//org.apache.iceberg.flink.source.DataIterator.class
public DataIterator(FileScanTaskReader<T> fileScanTaskReader, CombinedScanTask task, FileIO io, EncryptionManager encryption) 
    this.fileScanTaskReader = fileScanTaskReader;
    this.inputFilesDecryptor = new InputFilesDecryptor(task, io, encryption);
    this.tasks = task.files().iterator();
    this.currentIterator = CloseableIterator.empty();
    this.currentDeleteIterator = CloseableIterator.empty();



public boolean hasNext() 
    this.updateCurrentIterator();
    return this.currentIterator.hasNext() || this.currentDeleteIterator.hasNext();



public T next() 
    this.updateCurrentIterator();
    return this.currentDeleteIterator.hasNext() ? this.currentDeleteIterator.next() : this.currentIterator.next();



private void updateCurrentIterator() 
    try 
        while(!this.currentIterator.hasNext() && !this.currentDeleteIterator.hasNext() && this.tasks.hasNext()) 
            FileScanTask next = (FileScanTask)this.tasks.next();
            if (next.file().recordCount() == 0L) 
                this.currentDeleteIterator.close();
                this.currentDeleteIterator = this.openDeleteTaskIterator(next);
             else 
                this.currentIterator.close();
                this.currentIterator = this.openTaskIterator(next);
            
        


     catch (IOException var2) 
        throw new UncheckedIOException(var2);
    



private CloseableIterator<T> openTaskIterator(FileScanTask scanTask) 
    return this.fileScanTaskReader.open(scanTask, this.inputFilesDecryptor);



private CloseableIterator<T> openDeleteTaskIterator(FileScanTask scanTask) 
    return this.fileScanTaskReader.openDelete(scanTask, this.inputFilesDecryptor);

代码中fileScanTaskReader负责从FileScanTask读取记录:

//org.apache.iceberg.flink.source.RowDataFileScanTaskReader.class
public CloseableIterator<RowData> open(FileScanTask task, InputFilesDecryptor inputFilesDecryptor) 
    Schema partitionSchema = TypeUtil.select(this.projectedSchema, task.spec().identitySourceIds());
    Map<Integer, ?> idToConstant = partitionSchema.columns().isEmpty() ? ImmutableMap.of() : PartitionUtil.constantsMap(task, RowDataUtil::convertConstant);
    RowDataFileScanTaskReader.FlinkDeleteFilter deletes = new RowDataFileScanTaskReader.FlinkDeleteFilter(task, this.tableSchema, this.projectedSchema, inputFilesDecryptor);
    CloseableIterable<RowData> iterable = deletes.filter(this.newIterable(task, deletes.requiredSchema(), (Map)idToConstant, inputFilesDecryptor));
    if (!this.projectedSchema.sameSchema(deletes.requiredSchema())) 
        RowDataProjection rowDataProjection = RowDataProjection.create(deletes.requiredRowType(), deletes.requiredSchema().asStruct(), this.projectedSchema.asStruct());
        Objects.requireNonNull(rowDataProjection);
        iterable = CloseableIterable.transform(iterable, rowDataProjection::wrap);
    


    return iterable.iterator();

在其内部封装了对不同文件格式的支持,如PARQUET,AVRO,ORC等。

以上就是Flink读取Iceberg表的主体流程。

以上是关于Flink读取Iceberg表的实现源码解读的主要内容,如果未能解决你的问题,请参考以下文章

深度集成 Flink: Apache Iceberg 0.11.0 最新功能解读

数据湖(十七):Flink与Iceberg整合DataStream API操作

Flink Iceberg Source 并行度推断源码解析

Flink Iceberg Source 并行度推断源码解析

Flink 源码解读

Flink 源码解读