Hudi源码|bootstrap源码分析总结(写Hudi)
Posted 董可伦
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Hudi源码|bootstrap源码分析总结(写Hudi)相关的知识,希望对你有一定的参考价值。
前言
Apache Hudi bootstrap源码简要走读,不了解Hudi bootstrap的可以参考:利用Hudi Bootstrap转化现有Hive表的parquet/orc文件为Hudi表
版本
Hudi 0.12.0
Spark 2.4.4
入口
val bootstrapDF = spark.emptyDataFrame
bootstrapDF.write.
format("hudi").
options(extraOpts).
option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.BOOTSTRAP_OPERATION_OPT_VAL).
......
save(basePath)
根据文章:Hudi Spark源码学习总结-df.write.format(“hudi”).save可知,save
方法会走到DefaultSource.createRelation
override def createRelation(sqlContext: SQLContext,
mode: SaveMode,
optParams: Map[String, String],
df: DataFrame): BaseRelation =
val dfWithoutMetaCols = df.drop(HoodieRecord.HOODIE_META_COLUMNS.asScala:_*)
if (optParams.get(OPERATION.key).contains(BOOTSTRAP_OPERATION_OPT_VAL))
HoodieSparkSqlWriter.bootstrap(sqlContext, mode, optParams, dfWithoutMetaCols)
else
HoodieSparkSqlWriter.write(sqlContext, mode, optParams, dfWithoutMetaCols)
new HoodieEmptyRelation(sqlContext, dfWithoutMetaCols.schema)
它会判断OPERATION是否为BOOTSTRAP_OPERATION_OPT_VAL,这里为true,所以会调用HoodieSparkSqlWriter.bootstrap
HoodieSparkSqlWriter.bootstrap
这里首先获取一些参数,然后判断表是否存在,如果不存在证明是第一次写,需要设置写一些配置参数,然后进行初始化:HoodieTableMetaClient.initTable
,接着调用writeClient.bootstrap
def bootstrap(sqlContext: SQLContext,
mode: SaveMode,
optParams: Map[String, String],
df: DataFrame,
hoodieTableConfigOpt: Option[HoodieTableConfig] = Option.empty,
hoodieWriteClient: Option[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]] = Option.empty): Boolean =
assert(optParams.get("path").exists(!StringUtils.isNullOrEmpty(_)), "'path' must be set")
val path = optParams("path")
val basePath = new Path(path)
val sparkContext = sqlContext.sparkContext
val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration)
tableExists = fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME))
val tableConfig = getHoodieTableConfig(sparkContext, path, hoodieTableConfigOpt)
validateTableConfig(sqlContext.sparkSession, optParams, tableConfig, mode == SaveMode.Overwrite)
val (parameters, hoodieConfig) = mergeParamsAndGetHoodieConfig(optParams, tableConfig, mode)
val tableName = hoodieConfig.getStringOrThrow(HoodieWriteConfig.TBL_NAME, s"'$HoodieWriteConfig.TBL_NAME.key' must be set.")
val tableType = hoodieConfig.getStringOrDefault(TABLE_TYPE)
val bootstrapBasePath = hoodieConfig.getStringOrThrow(BASE_PATH,
s"'$BASE_PATH.key' is required for '$BOOTSTRAP_OPERATION_OPT_VAL'" +
" operation'")
val bootstrapIndexClass = hoodieConfig.getStringOrDefault(INDEX_CLASS_NAME)
var schema: String = null
if (df.schema.nonEmpty)
val (structName, namespace) = AvroConversionUtils.getAvroRecordNameAndNamespace(tableName)
schema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, namespace).toString
else
schema = HoodieAvroUtils.getNullSchema.toString
if (mode == SaveMode.Ignore && tableExists)
log.warn(s"hoodie table at $basePath already exists. Ignoring & not performing actual writes.")
if (!hoodieWriteClient.isEmpty)
hoodieWriteClient.get.close()
false
else
// Handle various save modes
handleSaveModes(sqlContext.sparkSession, mode, basePath, tableConfig, tableName, WriteOperationType.BOOTSTRAP, fs)
if (!tableExists) // 表如果不存在
val archiveLogFolder = hoodieConfig.getStringOrDefault(HoodieTableConfig.ARCHIVELOG_FOLDER)
val partitionColumns = HoodieWriterUtils.getPartitionColumns(parameters)
val recordKeyFields = hoodieConfig.getString(DataSourceWriteOptions.RECORDKEY_FIELD)
val keyGenProp = hoodieConfig.getString(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME)
val populateMetaFields = java.lang.Boolean.parseBoolean(parameters.getOrElse(
HoodieTableConfig.POPULATE_META_FIELDS.key(),
String.valueOf(HoodieTableConfig.POPULATE_META_FIELDS.defaultValue())
))
val baseFileFormat = hoodieConfig.getStringOrDefault(HoodieTableConfig.BASE_FILE_FORMAT)
val useBaseFormatMetaFile = java.lang.Boolean.parseBoolean(parameters.getOrElse(
HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.key(),
String.valueOf(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.defaultValue())
))
// 进行一些配置后,初始化Hudi表
HoodieTableMetaClient.withPropertyBuilder()
.setTableType(HoodieTableType.valueOf(tableType))
.setTableName(tableName)
.setRecordKeyFields(recordKeyFields)
.setArchiveLogFolder(archiveLogFolder)
.setPayloadClassName(hoodieConfig.getStringOrDefault(PAYLOAD_CLASS_NAME))
.setPreCombineField(hoodieConfig.getStringOrDefault(PRECOMBINE_FIELD, null))
.setBootstrapIndexClass(bootstrapIndexClass)
.setBaseFileFormat(baseFileFormat)
.setBootstrapBasePath(bootstrapBasePath)
.setPartitionFields(partitionColumns)
.setPopulateMetaFields(populateMetaFields)
.setKeyGeneratorClassProp(keyGenProp)
.setHiveStylePartitioningEnable(hoodieConfig.getBoolean(HIVE_STYLE_PARTITIONING))
.setUrlEncodePartitioning(hoodieConfig.getBoolean(URL_ENCODE_PARTITIONING))
.setCommitTimezone(HoodieTimelineTimeZone.valueOf(hoodieConfig.getStringOrDefault(HoodieTableConfig.TIMELINE_TIMEZONE)))
.setPartitionMetafileUseBaseFormat(useBaseFormatMetaFile)
.initTable(sparkContext.hadoopConfiguration, path)
val jsc = new JavaSparkContext(sqlContext.sparkContext)
val writeClient = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc,
schema, path, tableName, mapAsJavaMap(parameters)))
try
writeClient.bootstrap(org.apache.hudi.common.util.Option.empty())
finally
writeClient.close()
val metaSyncSuccess = metaSync(sqlContext.sparkSession, hoodieConfig, basePath, df.schema)
metaSyncSuccess
writeClient.bootstrap
这里的writeClient为SparkRDDWriteClient
,然后调用HoodieTable的bootstrap
,我们这里使用表类型为COW,所以为HoodieSparkCopyOnWriteTable
initTable(WriteOperationType.UPSERT, Option.ofNullable(HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS)).bootstrap(context, extraMetadata);
public static <T extends HoodieRecordPayload> HoodieSparkTable<T> create(HoodieWriteConfig config,
HoodieSparkEngineContext context,
HoodieTableMetaClient metaClient)
HoodieSparkTable<T> hoodieSparkTable;
switch (metaClient.getTableType())
case COPY_ON_WRITE:
hoodieSparkTable = new HoodieSparkCopyOnWriteTable<>(config, context, metaClient);
break;
case MERGE_ON_READ:
hoodieSparkTable = new HoodieSparkMergeOnReadTable<>(config, context, metaClient);
break;
default:
throw new HoodieException("Unsupported table type :" + metaClient.getTableType());
return hoodieSparkTable;
HoodieSparkCopyOnWriteTable.bootstrap
public HoodieBootstrapWriteMetadata<HoodieData<WriteStatus>> bootstrap(HoodieEngineContext context, Option<Map<String, String>> extraMetadata)
return new SparkBootstrapCommitActionExecutor((HoodieSparkEngineContext) context, config, this, extraMetadata).execute();
SparkBootstrapCommitActionExecutor.execute
这里首先通过listAndProcessSourcePartitions返回mode和对应的分区,其中mode有两种METADATA_ONLY和FULL_RECORD,然后对于METADATA_ONLY对应的分区路径执行metadataBootstrap,FULL_RECORD对应的分区路径执行fullBootstrap,从这里可以看出两点:1、通过listAndProcessSourcePartitions返回的mode值判断是进行METADATA_ONLY还是FULL_RECORD 2、具体的逻辑分别在metadataBootstrap,fullBootstrap。那么我们分别来看一下,首先看一下listAndProcessSourcePartitions是如何分会mode的
@Override
public HoodieBootstrapWriteMetadata<HoodieData<WriteStatus>> execute()
validate();
try
HoodieTableMetaClient metaClient = table.getMetaClient();
Option<HoodieInstant> completedInstant =
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant();
ValidationUtils.checkArgument(!completedInstant.isPresent(),
"Active Timeline is expected to be empty for bootstrap to be performed. "
+ "If you want to re-bootstrap, please rollback bootstrap first !!");
// 返回 mode和对应的分区,其中mode有两种METADATA_ONLY和FULL_RECORD
Map<BootstrapMode, List<Pair<String, List<HoodieFileStatus>>>> partitionSelections = listAndProcessSourcePartitions();
// First run metadata bootstrap which will auto commit
// 首先运行metadataBootstrap,如果partitionSelections中有METADATA_ONLY则继续执行metadataBootstrap的逻辑,没有的话,什么都不执行,直接返回
Option<HoodieWriteMetadata<HoodieData<WriteStatus>>> metadataResult = metadataBootstrap(partitionSelections.get(BootstrapMode.METADATA_ONLY));
// if there are full bootstrap to be performed, perform that too
// 然后运行fullBootstrap,如果partitionSelections中有FULL_RECORD则继续执行fullBootstrap的逻辑,没有的话,什么都不执行,直接返回
Option<HoodieWriteMetadata<HoodieData<WriteStatus>>> fullBootstrapResult = fullBootstrap(partitionSelections.get(BootstrapMode.FULL_RECORD));
// Delete the marker directory for the instant
WriteMarkersFactory.get(config.getMarkersType(), table, instantTime)
.quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
return new HoodieBootstrapWriteMetadata(metadataResult, fullBootstrapResult);
catch (IOException ioe)
throw new HoodieIOException(ioe.getMessage(), ioe);
listAndProcessSourcePartitions
这里的主要实现是selector.select,这里的select是通过MODE_SELECTOR_CLASS_NAME(hoodie.bootstrap.mode.selector)配置的,默认值为MetadataOnlyBootstrapModeSelector,我们的例子中FULL_RECORD设置的为FullRecordBootstrapModeSelector,让我们分别看一下他们具体的实现
private Map<BootstrapMode, List<Pair<String, List<HoodieFileStatus>>>> listAndProcessSourcePartitions() throws IOException
List<Pair<String, List<HoodieFileStatus>>> folders = BootstrapUtils.getAllLeafFoldersWithFiles(
table.getMetaClient(), bootstrapSourceFileSystem, config.getBootstrapSourceBasePath(), context);
LOG.info("Fetching Bootstrap Schema !!");
HoodieBootstrapSchemaProvider sourceSchemaProvider = new HoodieSparkBootstrapSchemaProvider(config);
bootstrapSchema = sourceSchemaProvider.getBootstrapSchema(context, folders).toString();
LOG.info("Bootstrap Schema :" + bootstrapSchema);
BootstrapModeSelector selector =
(BootstrapModeSelector) ReflectionUtils.loadClass(config.getBootstrapModeSelectorClass(), config);
Map<BootstrapMode, List<String>> result = selector.select(folders);
Map<String, List<HoodieFileStatus>> partitionToFiles = folders.stream().collect(
Collectors.toMap(Pair::getKey, Pair::getValue));
// Ensure all partitions are accounted for
ValidationUtils.checkArgument(partitionToFiles.keySet().equals(
result.values().stream().flatMap(Collection::stream).collect(Collectors.toSet())));
return result.entrySet().stream().map(e -> Pair.of(e.getKey(), e.getValue().stream()
.map(p -> Pair.of(p, partitionToFiles.get(p))).collect(Collectors.toList())))
.collect(Collectors.toMap(Pair::getKey, Pair::getValue));
selector.select
MetadataOnlyBootstrapModeSelector和FullRecordBootstrapModeSelector都是UniformBootstrapModeSelector的子类,区别是bootstrapMode不一样,它们的select方法是在父类UniformBootstrapModeSelector实现的
public class MetadataOnlyBootstrapModeSelector extends UniformBootstrapModeSelector
public MetadataOnlyBootstrapModeSelector(HoodieWriteConfig bootstrapConfig)
super(bootstrapConfig, BootstrapMode.METADATA_ONLY);
public class FullRecordBootstrapModeSelector extends UniformBootstrapModeSelector
public FullRecordBootstrapModeSelector(HoodieWriteConfig bootstrapConfig)
super(bootstrapConfig, BootstrapMode.FULL_RECORD);
UniformBootstrapModeSelector.select
很显然上面的mode的返回值和bootstrapMode是对应的,所以当MODE_SELECTOR_CLASS_NAME为MetadataOnlyBootstrapModeSelector和FullRecordBootstrapModeSelector时,他们的mode值是唯一的,要么执行metdata的逻辑要么执行full的逻辑,那么有没有两种模式同时会运行的情况呢,答案是有的。
public Map<BootstrapMode, List<String>> select(List<Pair<String, List<HoodieFileStatus>>> partitions)
return partitions.stream().map(p -> Pair.of(bootstrapMode, p))
.collect(Collectors.groupingBy(Pair::getKey, Collectors.mapping(x -> x.getValue数据湖之Hudi源码编译