Hudi源码|bootstrap源码分析总结(写Hudi)

Posted 董可伦

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Hudi源码|bootstrap源码分析总结(写Hudi)相关的知识,希望对你有一定的参考价值。

前言

Apache Hudi bootstrap源码简要走读,不了解Hudi bootstrap的可以参考:利用Hudi Bootstrap转化现有Hive表的parquet/orc文件为Hudi表

版本

Hudi 0.12.0
Spark 2.4.4

入口

val bootstrapDF = spark.emptyDataFrame
    bootstrapDF.write.
      format("hudi").
      options(extraOpts).
      option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.BOOTSTRAP_OPERATION_OPT_VAL).
      ......
      save(basePath)

根据文章:Hudi Spark源码学习总结-df.write.format(“hudi”).save可知,save方法会走到DefaultSource.createRelation

  override def createRelation(sqlContext: SQLContext,
                              mode: SaveMode,
                              optParams: Map[String, String],
                              df: DataFrame): BaseRelation = 
    val dfWithoutMetaCols = df.drop(HoodieRecord.HOODIE_META_COLUMNS.asScala:_*)

    if (optParams.get(OPERATION.key).contains(BOOTSTRAP_OPERATION_OPT_VAL)) 
      HoodieSparkSqlWriter.bootstrap(sqlContext, mode, optParams, dfWithoutMetaCols)
     else 
      HoodieSparkSqlWriter.write(sqlContext, mode, optParams, dfWithoutMetaCols)
    
    new HoodieEmptyRelation(sqlContext, dfWithoutMetaCols.schema)
  

它会判断OPERATION是否为BOOTSTRAP_OPERATION_OPT_VAL,这里为true,所以会调用HoodieSparkSqlWriter.bootstrap

HoodieSparkSqlWriter.bootstrap

这里首先获取一些参数,然后判断表是否存在,如果不存在证明是第一次写,需要设置写一些配置参数,然后进行初始化:HoodieTableMetaClient.initTable,接着调用writeClient.bootstrap

  def bootstrap(sqlContext: SQLContext,
                mode: SaveMode,
                optParams: Map[String, String],
                df: DataFrame,
                hoodieTableConfigOpt: Option[HoodieTableConfig] = Option.empty,
                hoodieWriteClient: Option[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]] = Option.empty): Boolean = 

    assert(optParams.get("path").exists(!StringUtils.isNullOrEmpty(_)), "'path' must be set")
    val path = optParams("path")
    val basePath = new Path(path)
    val sparkContext = sqlContext.sparkContext
    val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration)
    tableExists = fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME))
    val tableConfig = getHoodieTableConfig(sparkContext, path, hoodieTableConfigOpt)
    validateTableConfig(sqlContext.sparkSession, optParams, tableConfig, mode == SaveMode.Overwrite)

    val (parameters, hoodieConfig) = mergeParamsAndGetHoodieConfig(optParams, tableConfig, mode)
    val tableName = hoodieConfig.getStringOrThrow(HoodieWriteConfig.TBL_NAME, s"'$HoodieWriteConfig.TBL_NAME.key' must be set.")
    val tableType = hoodieConfig.getStringOrDefault(TABLE_TYPE)
    val bootstrapBasePath = hoodieConfig.getStringOrThrow(BASE_PATH,
      s"'$BASE_PATH.key' is required for '$BOOTSTRAP_OPERATION_OPT_VAL'" +
        " operation'")
    val bootstrapIndexClass = hoodieConfig.getStringOrDefault(INDEX_CLASS_NAME)

    var schema: String = null
    if (df.schema.nonEmpty) 
      val (structName, namespace) = AvroConversionUtils.getAvroRecordNameAndNamespace(tableName)
      schema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, namespace).toString
     else 
      schema = HoodieAvroUtils.getNullSchema.toString
    

    if (mode == SaveMode.Ignore && tableExists) 
      log.warn(s"hoodie table at $basePath already exists. Ignoring & not performing actual writes.")
      if (!hoodieWriteClient.isEmpty) 
        hoodieWriteClient.get.close()
      
      false
     else 
      // Handle various save modes
      handleSaveModes(sqlContext.sparkSession, mode, basePath, tableConfig, tableName, WriteOperationType.BOOTSTRAP, fs)

      if (!tableExists)  // 表如果不存在
        val archiveLogFolder = hoodieConfig.getStringOrDefault(HoodieTableConfig.ARCHIVELOG_FOLDER)
        val partitionColumns = HoodieWriterUtils.getPartitionColumns(parameters)
        val recordKeyFields = hoodieConfig.getString(DataSourceWriteOptions.RECORDKEY_FIELD)
        val keyGenProp = hoodieConfig.getString(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME)
        val populateMetaFields = java.lang.Boolean.parseBoolean(parameters.getOrElse(
          HoodieTableConfig.POPULATE_META_FIELDS.key(),
          String.valueOf(HoodieTableConfig.POPULATE_META_FIELDS.defaultValue())
        ))
        val baseFileFormat = hoodieConfig.getStringOrDefault(HoodieTableConfig.BASE_FILE_FORMAT)
        val useBaseFormatMetaFile = java.lang.Boolean.parseBoolean(parameters.getOrElse(
          HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.key(),
          String.valueOf(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.defaultValue())
        ))

        // 进行一些配置后,初始化Hudi表
        HoodieTableMetaClient.withPropertyBuilder()
          .setTableType(HoodieTableType.valueOf(tableType))
          .setTableName(tableName)
          .setRecordKeyFields(recordKeyFields)
          .setArchiveLogFolder(archiveLogFolder)
          .setPayloadClassName(hoodieConfig.getStringOrDefault(PAYLOAD_CLASS_NAME))
          .setPreCombineField(hoodieConfig.getStringOrDefault(PRECOMBINE_FIELD, null))
          .setBootstrapIndexClass(bootstrapIndexClass)
          .setBaseFileFormat(baseFileFormat)
          .setBootstrapBasePath(bootstrapBasePath)
          .setPartitionFields(partitionColumns)
          .setPopulateMetaFields(populateMetaFields)
          .setKeyGeneratorClassProp(keyGenProp)
          .setHiveStylePartitioningEnable(hoodieConfig.getBoolean(HIVE_STYLE_PARTITIONING))
          .setUrlEncodePartitioning(hoodieConfig.getBoolean(URL_ENCODE_PARTITIONING))
          .setCommitTimezone(HoodieTimelineTimeZone.valueOf(hoodieConfig.getStringOrDefault(HoodieTableConfig.TIMELINE_TIMEZONE)))
          .setPartitionMetafileUseBaseFormat(useBaseFormatMetaFile)
          .initTable(sparkContext.hadoopConfiguration, path)
      

      val jsc = new JavaSparkContext(sqlContext.sparkContext)
      val writeClient = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc,
        schema, path, tableName, mapAsJavaMap(parameters)))
      try 
        writeClient.bootstrap(org.apache.hudi.common.util.Option.empty())
       finally 
        writeClient.close()
      
      val metaSyncSuccess = metaSync(sqlContext.sparkSession, hoodieConfig, basePath, df.schema)
      metaSyncSuccess
    
  

writeClient.bootstrap

这里的writeClient为SparkRDDWriteClient,然后调用HoodieTable的bootstrap,我们这里使用表类型为COW,所以为HoodieSparkCopyOnWriteTable

  initTable(WriteOperationType.UPSERT, Option.ofNullable(HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS)).bootstrap(context, extraMetadata);
  public static <T extends HoodieRecordPayload> HoodieSparkTable<T> create(HoodieWriteConfig config,
                                                                           HoodieSparkEngineContext context,
                                                                           HoodieTableMetaClient metaClient) 
    HoodieSparkTable<T> hoodieSparkTable;
    switch (metaClient.getTableType()) 
      case COPY_ON_WRITE:
        hoodieSparkTable = new HoodieSparkCopyOnWriteTable<>(config, context, metaClient);
        break;
      case MERGE_ON_READ:
        hoodieSparkTable = new HoodieSparkMergeOnReadTable<>(config, context, metaClient);
        break;
      default:
        throw new HoodieException("Unsupported table type :" + metaClient.getTableType());
    
    return hoodieSparkTable;
  

HoodieSparkCopyOnWriteTable.bootstrap

  public HoodieBootstrapWriteMetadata<HoodieData<WriteStatus>> bootstrap(HoodieEngineContext context, Option<Map<String, String>> extraMetadata) 
    return new SparkBootstrapCommitActionExecutor((HoodieSparkEngineContext) context, config, this, extraMetadata).execute();
  

SparkBootstrapCommitActionExecutor.execute

这里首先通过listAndProcessSourcePartitions返回mode和对应的分区,其中mode有两种METADATA_ONLY和FULL_RECORD,然后对于METADATA_ONLY对应的分区路径执行metadataBootstrap,FULL_RECORD对应的分区路径执行fullBootstrap,从这里可以看出两点:1、通过listAndProcessSourcePartitions返回的mode值判断是进行METADATA_ONLY还是FULL_RECORD 2、具体的逻辑分别在metadataBootstrap,fullBootstrap。那么我们分别来看一下,首先看一下listAndProcessSourcePartitions是如何分会mode的

  @Override
  public HoodieBootstrapWriteMetadata<HoodieData<WriteStatus>> execute() 
    validate();
    try 
      HoodieTableMetaClient metaClient = table.getMetaClient();
      Option<HoodieInstant> completedInstant =
          metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant();
      ValidationUtils.checkArgument(!completedInstant.isPresent(),
          "Active Timeline is expected to be empty for bootstrap to be performed. "
              + "If you want to re-bootstrap, please rollback bootstrap first !!");
      // 返回 mode和对应的分区,其中mode有两种METADATA_ONLY和FULL_RECORD       
      Map<BootstrapMode, List<Pair<String, List<HoodieFileStatus>>>> partitionSelections = listAndProcessSourcePartitions();

      // First run metadata bootstrap which will auto commit
      // 首先运行metadataBootstrap,如果partitionSelections中有METADATA_ONLY则继续执行metadataBootstrap的逻辑,没有的话,什么都不执行,直接返回
      Option<HoodieWriteMetadata<HoodieData<WriteStatus>>> metadataResult = metadataBootstrap(partitionSelections.get(BootstrapMode.METADATA_ONLY));
      // if there are full bootstrap to be performed, perform that too
      // 然后运行fullBootstrap,如果partitionSelections中有FULL_RECORD则继续执行fullBootstrap的逻辑,没有的话,什么都不执行,直接返回
      Option<HoodieWriteMetadata<HoodieData<WriteStatus>>> fullBootstrapResult = fullBootstrap(partitionSelections.get(BootstrapMode.FULL_RECORD));
      // Delete the marker directory for the instant
      WriteMarkersFactory.get(config.getMarkersType(), table, instantTime)
          .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
      return new HoodieBootstrapWriteMetadata(metadataResult, fullBootstrapResult);
     catch (IOException ioe) 
      throw new HoodieIOException(ioe.getMessage(), ioe);
    
  

listAndProcessSourcePartitions

这里的主要实现是selector.select,这里的select是通过MODE_SELECTOR_CLASS_NAME(hoodie.bootstrap.mode.selector)配置的,默认值为MetadataOnlyBootstrapModeSelector,我们的例子中FULL_RECORD设置的为FullRecordBootstrapModeSelector,让我们分别看一下他们具体的实现

  private Map<BootstrapMode, List<Pair<String, List<HoodieFileStatus>>>> listAndProcessSourcePartitions() throws IOException 
    List<Pair<String, List<HoodieFileStatus>>> folders = BootstrapUtils.getAllLeafFoldersWithFiles(
            table.getMetaClient(), bootstrapSourceFileSystem, config.getBootstrapSourceBasePath(), context);

    LOG.info("Fetching Bootstrap Schema !!");
    HoodieBootstrapSchemaProvider sourceSchemaProvider = new HoodieSparkBootstrapSchemaProvider(config);
    bootstrapSchema = sourceSchemaProvider.getBootstrapSchema(context, folders).toString();
    LOG.info("Bootstrap Schema :" + bootstrapSchema);

    BootstrapModeSelector selector =
        (BootstrapModeSelector) ReflectionUtils.loadClass(config.getBootstrapModeSelectorClass(), config);

    Map<BootstrapMode, List<String>> result = selector.select(folders);
    Map<String, List<HoodieFileStatus>> partitionToFiles = folders.stream().collect(
        Collectors.toMap(Pair::getKey, Pair::getValue));

    // Ensure all partitions are accounted for
    ValidationUtils.checkArgument(partitionToFiles.keySet().equals(
        result.values().stream().flatMap(Collection::stream).collect(Collectors.toSet())));

    return result.entrySet().stream().map(e -> Pair.of(e.getKey(), e.getValue().stream()
        .map(p -> Pair.of(p, partitionToFiles.get(p))).collect(Collectors.toList())))
        .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
  

selector.select

MetadataOnlyBootstrapModeSelector和FullRecordBootstrapModeSelector都是UniformBootstrapModeSelector的子类,区别是bootstrapMode不一样,它们的select方法是在父类UniformBootstrapModeSelector实现的

public class MetadataOnlyBootstrapModeSelector extends UniformBootstrapModeSelector 

  public MetadataOnlyBootstrapModeSelector(HoodieWriteConfig bootstrapConfig) 
    super(bootstrapConfig, BootstrapMode.METADATA_ONLY);
  


public class FullRecordBootstrapModeSelector extends UniformBootstrapModeSelector 

  public FullRecordBootstrapModeSelector(HoodieWriteConfig bootstrapConfig) 
    super(bootstrapConfig, BootstrapMode.FULL_RECORD);
  


UniformBootstrapModeSelector.select

很显然上面的mode的返回值和bootstrapMode是对应的,所以当MODE_SELECTOR_CLASS_NAME为MetadataOnlyBootstrapModeSelector和FullRecordBootstrapModeSelector时,他们的mode值是唯一的,要么执行metdata的逻辑要么执行full的逻辑,那么有没有两种模式同时会运行的情况呢,答案是有的。

  public Map<BootstrapMode, List<String>> select(List<Pair<String, List<HoodieFileStatus>>> partitions) 
    return partitions.stream().map(p -> Pair.of(bootstrapMode, p))
        .collect(Collectors.groupingBy(Pair::getKey, Collectors.mapping(x -> x.getValue数据湖之Hudi源码编译

Pandora Bootstrap源码分析

Bootstrap源码分析之dropdown

数据湖架构HudiHudi版本0.12源码编译Hudi集成spark使用IDEA与spark对hudi表增删改查

Bootstrap源码分析之navcollapse

BOOtstrap源码分析之 tooltippopover