Flink读取Iceberg表的实现源码解读
Posted 咬定青松
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink读取Iceberg表的实现源码解读相关的知识,希望对你有一定的参考价值。
本文首发微信公众号:码上观世界
任何存储系统都是由数据和元数据组成,Hive也不例外,只是它的数据存储系统和元数据存储是分离的,分别位于不同的分布式系统中,如数据存储在分布式系统MinIO或者HDFS上,元数据存储在HIve Metastore或第三方元数据系统如Glue等。
外部系统访问存储系统需要两步:先从元数据系统中获取要访问数据的元数据,如所在的位置等信息,然后再通过元数据访问存储系统,访问实际的数据。
Flink查询Iceberg表,可以使用如下的API:
HiveConf hiveConf=new HiveConf();
hiveConf.set("hive.metastore.uris", hmsUri);
hiveConf.set("hive.metastore.warehouse.dir", "s3a://test/");
...
Map<String, String> properties = new HashMap<>();
CatalogLoader catalogLoader=CatalogLoader.hive("hive", hiveConf, properties);
FlinkCatalog flinkCatalog=new FlinkCatalog("test_catalog_name","default", Namespace.empty(),catalogLoader,false);
flinkCatalog.open();
tableEnvironment.registerCatalog(flinkCatalog.getName(),flinkCatalog);
tableEnvironment.useCatalog(flinkCatalog.getName());
tableEnvironment.executeSql("select * from test_iceberg_table");
在FlinkCatalog中,动态加载HiveCatalog,并调用其initialize方法,完成初始化:
public class HiveCatalog extends BaseMetastoreCatalog implements SupportsNamespaces, Configurable
public static final String LIST_ALL_TABLES = "list-all-tables";
public static final String LIST_ALL_TABLES_DEFAULT = "false";
private static final Logger LOG = LoggerFactory.getLogger(HiveCatalog.class);
private String name;
private Configuration conf;
private FileIO fileIO;
private ClientPool<IMetaStoreClient, TException> clients;
private boolean listAllTables = false;
@Override
public void initialize(String inputName, Map<String, String> properties)
this.name = inputName;
if (conf == null)
LOG.warn("No Hadoop Configuration was set, using the default environment Configuration");
this.conf = new Configuration();
if (properties.containsKey(CatalogProperties.URI))
this.conf.set(HiveConf.ConfVars.METASTOREURIS.varname, properties.get(CatalogProperties.URI));
if (properties.containsKey(CatalogProperties.WAREHOUSE_LOCATION))
this.conf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, properties.get(CatalogProperties.WAREHOUSE_LOCATION));
this.listAllTables = Boolean.parseBoolean(properties.getOrDefault(LIST_ALL_TABLES, LIST_ALL_TABLES_DEFAULT));
String fileIOImpl = properties.get(CatalogProperties.FILE_IO_IMPL);
this.fileIO = fileIOImpl == null ? new HadoopFileIO(conf) : CatalogUtil.loadFileIO(fileIOImpl, properties, conf);
this.clients = new CachedClientPool(conf, properties);
后续FlinkCatalog操作Iceberg元数据基本都代理到HiveCatalog执行,比如:
@Override
public CatalogTable getTable(ObjectPath tablePath) throws TableNotExistException, CatalogException
Table table = loadIcebergTable(tablePath);
return toCatalogTable(table);
private Table loadIcebergTable(ObjectPath tablePath) throws TableNotExistException
Table table = icebergCatalog.loadTable(toIdentifier(tablePath));
if (cacheEnabled)
table.refresh();
return table;
代码中Table是Iceberg定义的表对象,其可以转成Flink定义的表对象。Table有一个重要的成员变量:HiveTableOperations,它包含了以下信息:
当前table的完成标识(fullname和databaseName、tableName);
当前元数据(currentmetadata);
当前元数据位置(currentmetadataLocation);
HadoopFileIO,对表数据文件的读写则通过FileIO实现,Iceberg中的默认实现是HadoopFileIO;
HiveClient;
Hadoop Configuration对象,用于保存Hadoop相关的配置信息
下图是一个BaseTable对象示例的成员变量组成:
HiveTableOperations定义了表操作的基本接口,有了这个对象,使通过Table对象操作数据和元数据成为可能。
FlinkSource是Iceberg提供的用于读取Iceberg数据的封装类。另一个用于用于写Iceberg的封装类是FlinkSink。FlinkSource的关键方法是build:
//org.apache.iceberg.flink.source.FlinkSource.class
public DataStream<RowData> build()
Preconditions.checkNotNull(this.env, "StreamExecutionEnvironment should not be null");
FlinkInputFormat format = this.buildFormat();
ScanContext context = this.contextBuilder.build();
TypeInformation<RowData> typeInfo = FlinkCompatibilityUtil.toTypeInfo(FlinkSchemaUtil.convert(context.project()));
if (!context.isStreaming())
int parallelism = this.inferParallelism(format, context);
return this.env.createInput(format, typeInfo).setParallelism(parallelism);
else
StreamingMonitorFunction function = new StreamingMonitorFunction(this.tableLoader, context);
String monitorFunctionName = String.format("Iceberg table (%s) monitor", this.table);
String readerOperatorName = String.format("Iceberg table (%s) reader", this.table);
return this.env.addSource(function, monitorFunctionName).transform(readerOperatorName, typeInfo, StreamingReaderOperator.factory(format));
其中重要的类实现是FlinkInputFormat,其提供了跟Flink集成时,操作表的元数据和数据的方式:
//org.apache.iceberg.flink.source.FlinkInputFormat
FlinkInputFormat(TableLoader tableLoader, Schema tableSchema, FileIO io, EncryptionManager encryption, ScanContext context)
this.tableLoader = tableLoader;
this.io = io;
this.encryption = encryption;
this.context = context;
this.rowDataReader = new RowDataFileScanTaskReader(tableSchema, context.project(), context.nameMapping(), context.caseSensitive());
public FlinkInputSplit[] createInputSplits(int minNumSplits) throws IOException
this.tableLoader.open();
TableLoader loader = this.tableLoader;
FlinkInputSplit[] inputSplits;
Table table = loader.loadTable();
inputSplits = FlinkSplitPlanner.planInputSplits(table, this.context);
return inputSplits;
public void open(FlinkInputSplit split)
this.iterator = new DataIterator(this.rowDataReader, split.getTask(), this.io, this.encryption);
public RowData nextRecord(RowData reuse)
++this.currentReadCount;
return (RowData)this.iterator.next();
FlinkInputFormat有两个重要的成员变量:TableLoader tableLoader和FileIO io。通过它们,可以对元数据和数据进行操作。流任务(StreamTask)启动时候执行InputFormatSourceFunction的open方法完成初始化,然后就通过FlinkInputFormat循环读取数据记录:
//org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.class
public void open(Configuration parameters) throws Exception
StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext();
if (format instanceof RichInputFormat)
((RichInputFormat) format).setRuntimeContext(context);
format.configure(parameters);
provider = context.getInputSplitProvider();
serializer = typeInfo.createSerializer(getRuntimeContext().getExecutionConfig());
splitIterator = getInputSplits();
isRunning = splitIterator.hasNext();
@Override
public void run(SourceContext<OUT> ctx) throws Exception
...
OUT nextElement = serializer.createInstance();
while (isRunning)
format.open(splitIterator.next());
// for each element we also check if cancel
// was called by checking the isRunning flag
while (isRunning && !format.reachedEnd())
nextElement = format.nextRecord(nextElement);
if (nextElement != null)
ctx.collect(nextElement);
else
break;
format.close();
if (isRunning)
isRunning = splitIterator.hasNext();
...
但是要获取记录级别的数据,需要经过两大步骤:先获取FlinkInputSplit 迭代器,然后获取FileScanTask迭代器,并从中读取记录。
前者对应代码中getInputSplits方法,其通过RPC方法调用获取读取文件FlinkInputSplit的迭代器,实际最终仍然是调用FlinkInputFormat的createInputSplits,在createInputSplits方法中,通过FlinkSplitPlanner返回FlinkInputSplit迭代器:
//org.apache.iceberg.flink.source.FlinkSplitPlanner.class
static FlinkInputSplit[] planInputSplits(Table table, ScanContext context)
CloseableIterable<CombinedScanTask> tasksIterable = planTasks(table, context);
List<CombinedScanTask> tasks = Lists.newArrayList(tasksIterable);
FlinkInputSplit[] splits = new FlinkInputSplit[tasks.size()];
for(int i = 0; i < tasks.size(); ++i)
splits[i] = new FlinkInputSplit(i, (CombinedScanTask)tasks.get(i));
return splits;
static CloseableIterable<CombinedScanTask> planTasks(Table table, ScanContext context)
TableScan scan = table.newScan().caseSensitive(context.caseSensitive()).project(context.project());
//apply scan options
...
return scan.planTasks();
后者通过format.nextRecord读取记录,内部使用了DataIterator,在format.open初始化DataIterator,在DataIterator完成对FileScanTask级别的迭代读访问:
//org.apache.iceberg.flink.source.DataIterator.class
public DataIterator(FileScanTaskReader<T> fileScanTaskReader, CombinedScanTask task, FileIO io, EncryptionManager encryption)
this.fileScanTaskReader = fileScanTaskReader;
this.inputFilesDecryptor = new InputFilesDecryptor(task, io, encryption);
this.tasks = task.files().iterator();
this.currentIterator = CloseableIterator.empty();
this.currentDeleteIterator = CloseableIterator.empty();
public boolean hasNext()
this.updateCurrentIterator();
return this.currentIterator.hasNext() || this.currentDeleteIterator.hasNext();
public T next()
this.updateCurrentIterator();
return this.currentDeleteIterator.hasNext() ? this.currentDeleteIterator.next() : this.currentIterator.next();
private void updateCurrentIterator()
try
while(!this.currentIterator.hasNext() && !this.currentDeleteIterator.hasNext() && this.tasks.hasNext())
FileScanTask next = (FileScanTask)this.tasks.next();
if (next.file().recordCount() == 0L)
this.currentDeleteIterator.close();
this.currentDeleteIterator = this.openDeleteTaskIterator(next);
else
this.currentIterator.close();
this.currentIterator = this.openTaskIterator(next);
catch (IOException var2)
throw new UncheckedIOException(var2);
private CloseableIterator<T> openTaskIterator(FileScanTask scanTask)
return this.fileScanTaskReader.open(scanTask, this.inputFilesDecryptor);
private CloseableIterator<T> openDeleteTaskIterator(FileScanTask scanTask)
return this.fileScanTaskReader.openDelete(scanTask, this.inputFilesDecryptor);
代码中fileScanTaskReader负责从FileScanTask读取记录:
//org.apache.iceberg.flink.source.RowDataFileScanTaskReader.class
public CloseableIterator<RowData> open(FileScanTask task, InputFilesDecryptor inputFilesDecryptor)
Schema partitionSchema = TypeUtil.select(this.projectedSchema, task.spec().identitySourceIds());
Map<Integer, ?> idToConstant = partitionSchema.columns().isEmpty() ? ImmutableMap.of() : PartitionUtil.constantsMap(task, RowDataUtil::convertConstant);
RowDataFileScanTaskReader.FlinkDeleteFilter deletes = new RowDataFileScanTaskReader.FlinkDeleteFilter(task, this.tableSchema, this.projectedSchema, inputFilesDecryptor);
CloseableIterable<RowData> iterable = deletes.filter(this.newIterable(task, deletes.requiredSchema(), (Map)idToConstant, inputFilesDecryptor));
if (!this.projectedSchema.sameSchema(deletes.requiredSchema()))
RowDataProjection rowDataProjection = RowDataProjection.create(deletes.requiredRowType(), deletes.requiredSchema().asStruct(), this.projectedSchema.asStruct());
Objects.requireNonNull(rowDataProjection);
iterable = CloseableIterable.transform(iterable, rowDataProjection::wrap);
return iterable.iterator();
在其内部封装了对不同文件格式的支持,如PARQUET,AVRO,ORC等。
以上就是Flink读取Iceberg表的主体流程。
以上是关于Flink读取Iceberg表的实现源码解读的主要内容,如果未能解决你的问题,请参考以下文章
深度集成 Flink: Apache Iceberg 0.11.0 最新功能解读
数据湖(十七):Flink与Iceberg整合DataStream API操作
Flink Iceberg Source 并行度推断源码解析