Flink源码篇-FLINK的StreamingHive实现流程以及小文件压缩流程

Posted 黑猫~

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink源码篇-FLINK的StreamingHive实现流程以及小文件压缩流程相关的知识,希望对你有一定的参考价值。

Flink的HiveStreamingSink实现流程

前言

目前我们为了增强数据的时效性,增加了Flink实时写入Hive的流程,基于Flink写入Hive这里之前基本上是没有接触过的,看了官网的文章之后,由于我们的追求数据为1-10分钟内可见性,但是数据也不足1分钟就能达到128MB的情况,于是也会产生各种各样的十几MB的小文件,于是需要了解一下这个写入流程基于上面进行改造,使小文件能够达到自动合并的效果,顺便记录一下FlinkStreamingHive的流程



1,HiveTableSink初始化校验流程

1.1创建TableSink对象

public HiveTableSink(
        ReadableConfig flinkConf,
        JobConf jobConf,
        ObjectIdentifier identifier,
        CatalogTable table,
        @Nullable Integer configuredParallelism) {
	//构造方法传入参数
    this.flinkConf = flinkConf;
    this.jobConf = jobConf;
	//这个标识符是一个catalog,db和tbObjName的标识
    this.identifier = identifier;
	//CataLog中表的信息
    this.catalogTable = table;
	//HiveVersion这里很重要主要是为了根据不同的版本适配不同的实现方法,没有写入会获取你metastore默认的version
    hiveVersion =
            Preconditions.checkNotNull(
                    jobConf.get(HiveCatalogFactoryOptions.HIVE_VERSION.key()),
                    "Hive version is not defined");
	//hiveShim就是适配不同版本的工具类
    hiveShim = HiveShimLoader.loadHiveShim(hiveVersion);
	//获取表结构
    tableSchema = TableSchemaUtils.getPhysicalSchema(table.getSchema());
	//获取配置的Parallelism,这里是在工厂类里获取传入的
    this.configuredParallelism = configuredParallelism;
}

1.2返回SinkRunTimeProvider

 @Override
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
    //数据结构映射器,映射Pojo为DataRow等操作...
    DataStructureConverter converter =
            context.createDataStructureConverter(tableSchema.toRowDataType());
    //这里就是返回一个DataStreamSinkProvider,用于在启动执行时提供Sink
    return (DataStreamSinkProvider)
            dataStream -> consume(dataStream, context.isBounded(), converter);
}

private DataStreamSink<?> consume(DataStream<RowData> dataStream, boolean isBounded, DataStructureConverter converter) {
    //检查是否是Hive的ACID表,在Hive高版本以上提供了ACID的概念,2.x以及3.x对acid支持的更好,Flink目前是不支持写入Hive的ACID表的
    checkAcidTable(catalogTable, identifier.toObjectPath()); 
	//try with resource  好处就是无论是否exception都会帮你close connector
    //flink 包装了一层主要就是根据版本来创建metastore client,继承了AutoCloseable帮助自动释放
    try (HiveMetastoreClientWrapper client = HiveMetastoreClientFactory.create(HiveConfUtils.create(jobConf), hiveVersion)) {
        //通过identifier获取Table对象;
        Table table = client.getTable(identifier.getDatabaseName(), identifier.getObjectName());
        //这里是获取表存储描述,里面包含表的一些元数据描述
        StorageDescriptor sd = table.getSd();
		//获取Hive输出结构Class
        Class hiveOutputFormatClz =hiveShim.getHiveOutputFormatClass(Class.forName(sd.getOutputFormat()));
        //文件是否进行存储压缩,text不压缩,如果store以orc or parquet即为true
        boolean isCompressed = jobConf.getBoolean(HiveConf.ConfVars.COMPRESSRESULT.varname, false);
        //对Hive操作的一个工厂,可以用来创建记录写入到Hive的写入器
        HiveWriterFactory writerFactory =
                new HiveWriterFactory(
                        jobConf,
                        hiveOutputFormatClz,
                        sd.getSerdeInfo(),
                        tableSchema,
                        getPartitionKeyArray(),
                        HiveReflectionUtils.getTableMetadata(hiveShim, table),
                        hiveShim,
                        isCompressed);
        //获取文件的扩展名,如果Table的Store是orc or parquet的话则是没有扩展名的
        String extension =
                Utilities.getFileExtension(
                        jobConf,
                        isCompressed,
                        (HiveOutputFormat<?, ?>) hiveOutputFormatClz.newInstance());
		//Flink输出的一个小配置,其中主要包含文件的前缀和后缀
        //后缀是基于上面获取的,而前缀则是part-随机字符创
        //比如说:part-e9ebbc0c-ae29-4ac7-8c84-f80daf385915-0-413
        //这里的前缀并不是最终的文件名称,当你开启了压缩之后还会在前面添加内容的
        OutputFileConfig.OutputFileConfigBuilder fileNamingBuilder =
                OutputFileConfig.builder()
                        .withPartPrefix("part-" + UUID.randomUUID().toString())
                        .withPartSuffix(extension == null ? "" : extension);
		//获取parallelism,主要是为了后面使用.
        final int parallelism =
                Optional.ofNullable(configuredParallelism).orElse(dataStream.getParallelism());
        //到这里针对于Sink的初始化基本已经结束了,接下来我们需要判断本次执行是Stream还是batch
        if (isBounded) {
            //如果是Batch的话输出文件名是不同的,需要注意一下这个
            OutputFileConfig fileNaming = fileNamingBuilder.build();
            return createBatchSink(
                    dataStream, converter, sd, writerFactory, fileNaming, parallelism);
        } else {
            //如果是Stream的话,首先肯定是不支持overwrite的,如果是overwrite的话,直接exception
            if (overwrite) {
                throw new IllegalStateException("Streaming mode not support overwrite.");
            }
			//获取一下Hive表的配置
            Properties tableProps = HiveReflectionUtils.getTableMetadata(hiveShim, table);
            //完成Hive的Sink创建
            return createStreamSink(
                    dataStream, sd, tableProps, writerFactory, fileNamingBuilder, parallelism);
        }
    } catch (TException e) {
        //异常这里也注意一下啦,后续排错方便~
        throw new CatalogException("Failed to query Hive metaStore", e);
    } catch (IOException e) {
        throw new FlinkRuntimeException("Failed to create staging dir", e);
    } catch (ClassNotFoundException e) {
        throw new FlinkHiveException("Failed to get output format class", e);
    } catch (IllegalAccessException | InstantiationException e) {
        throw new FlinkHiveException("Failed to instantiate output format instance", e);
    }
}

基于这里,我们针对于Flink的HiveTableSink初始化就基本了解的差不多完成了,说实话一个顶级的框架代码规范以及异常处理都是非常吊的,学框架的基本就是了解思想,其次要去看看别人怎么写代码,可以收获特别多,非常值得我们学习;

2,HiveTableStreamSink创建

2.1 StreamSink的创建

 private DataStreamSink<?> createStreamSink(  //说实话我有强迫症,这样看参数我好难受
         DataStream<RowData> dataStream,	//数据流,不用多说
         StorageDescriptor sd,//table storage description  里面包含一些描述
         Properties tableProps,	//table properties  表的配置就是你创建表的pros
         HiveWriterFactory recordWriterFactory, //记录写出工厂
         OutputFileConfig.OutputFileConfigBuilder fileNamingBuilder,//写出文件配置
         final int parallelism   //这里是subtask数量
         ) {
     //创建一个Flink Configuration对象
     org.apache.flink.configuration.Configuration conf = new org.apache.flink.configuration.Configuration();
     //然后将表的配置信息写入
     catalogTable.getOptions().forEach(conf::setString);
	 //数据分区计算器~也就是hive的数据存储分区啦
     HiveRowDataPartitionComputer partComputer =
             new HiveRowDataPartitionComputer(
                     hiveShim,
                     defaultPartName(),
                     tableSchema.getFieldNames(),
                     tableSchema.getFieldDataTypes(),
                     getPartitionKeyArray());
     //数据表bucket,根据partComputer来区分bucket
     TableBucketAssigner assigner = new TableBucketAssigner(partComputer);
     //数据文件滚动策略,这个是最早的文件处理机制,考虑到多分区的情况会产生小文件从而有了compress机制
     HiveRollingPolicy rollingPolicy =
             new HiveRollingPolicy(
                     conf.get(SINK_ROLLING_POLICY_FILE_SIZE).getBytes(),
                     conf.get(SINK_ROLLING_POLICY_ROLLOVER_INTERVAL).toMillis());
	 //是否开启分区文件压缩,分区文件压缩这里说明一下
     //为什么有了滚动策略还有这个压缩,比如你parallelism是5 那就是会创建5个小文件...自己想想吧
     boolean autoCompaction = conf.getBoolean(FileSystemOptions.AUTO_COMPACTION);
     //如果开启了之后所有写入的文件(没合并之前)都是uncompaction前缀的标识~
     if (autoCompaction) {
         fileNamingBuilder.withPartPrefix(
                 convertToUncompacted(fileNamingBuilder.build().getPartPrefix()));
     }
     //然后获取文件名配置
     OutputFileConfig outputFileConfig = fileNamingBuilder.build();
	 //获取path对象,这里的path对象是指表的存储路径,并不是某个文件的绝对路径,是表在HDFS的绝对路径
     org.apache.flink.core.fs.Path path = new org.apache.flink.core.fs.Path(sd.getLocation());
	 //BucketBuilder,
     BucketsBuilder<RowData, String, ? extends BucketsBuilder<RowData, ?, ?>> builder;
     //判断是否是MR的还是FLINK本身的
     if (flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER)) {
         builder =
                 bucketsBuilderForMRWriter(
                         recordWriterFactory, sd, assigner, rollingPolicy, outputFileConfig);
         LOG.info("Hive streaming sink: Use MapReduce RecordWriter writer.");
     } else {
         Optional<BulkWriter.Factory<RowData>> bulkFactory =
                 createBulkWriterFactory(getPartitionKeyArray(), sd);
         //根据不同的格式创建的bulkfactory,如果不存在则默认创建hadoop mr的
         if (bulkFactory.isPresent()) {
             builder =
                     StreamingFileSink.forBulkFormat(
                                     path,
                                     new FileSystemTableSink.ProjectionBulkFactory(
                                             bulkFactory.get(), partComputer))
                             .withBucketAssigner(assigner)
                             .withRollingPolicy(rollingPolicy)
                             .withOutputFileConfig(outputFileConfig);
             LOG.info("Hive streaming sink: Use native parquet&orc writer.");
         } else {
             builder =
                     bucketsBuilderForMRWriter(
                             recordWriterFactory, sd, assigner, rollingPolicy, outputFileConfig);
             LOG.info(
                     "Hive streaming sink: Use MapReduce RecordWriter writer because BulkWriter Factory not available.");
         }
     }
	 //bucket检查间隔,是建表是写在tblPro的参数值,详情见官网FileSystemSink
     long bucketCheckInterval = conf.get(SINK_ROLLING_POLICY_CHECK_INTERVAL).toMillis();
	 //输出流,这个sink只是将record写出到文件,并不是最终的operator
     DataStream<PartitionCommitInfo> writerStream;
     //判断是否开启压缩,是建表是写在tblPro的参数值
     if (autoCompaction) {
         //文件压缩的大小,这里我们要注意一下如果你不配置的话默认值就是你的SINK_ROLLING_POLICY_FILE_SIZE~
         long compactionSize =
                 conf.getOptional(FileSystemOptions.COMPACTION_FILE_SIZE)
                         .orElse(conf.get(SINK_ROLLING_POLICY_FILE_SIZE))
                         .getBytes();
		 //创建输出流,通过StreamingSink对象创建
         writerStream =
                 StreamingSink.compactionWriter(
                         dataStream,
                         bucketCheckInterval,
                         builder,
                         fsFactory(),
                         path,
                         createCompactReaderFactory(sd, tableProps),
                         compactionSize,
                         parallelism);
     } else {
         writerStream =
                 StreamingSink.writer(dataStream, bucketCheckInterval, builder, parallelism);
     }
	 //Sink就是挂载了Sink了这里先不急
     return StreamingSink.sink(
             writerStream, path, identifier, getPartitionKeys(), msFactory(), fsFactory(), conf);
 }

到这里StreamSink就挂载结束了,但是其真正的实现我们目前并没有看到,真正实现,其实实现是在compactionWriter中实现的,我们可以看一下这个内容

public static <T> DataStream<PartitionCommitInfo> compactionWriter(//晕~强迫症已经犯了
        DataStream<T> inputStream, //输入流,比如 rowData,String,Struct等
        long bucketCheckInterval,//检查间隔
        StreamingFileSink.BucketsBuilder<
                       T, String, ? extends StreamingFileSink.BucketsBuilder<T, String, ?>>
                bucketsBuilder,//bucker建造者
        FileSystemFactory fsFactory,//文件系统工厂
        Path path,//路径
        CompactReader.Factory<T> readFactory,//合并读取工厂,这里应该很好理解吧~
        long targetFileSize,//目标文件大小
        int parallelism  //分区数
        ) {
    /**
     * 这个类里实现了三个算子~我们来看一下这是哪个算子的用户
     * writer是用来写入数据到buckt,并且像下游发送openfile or checkpoint success message操作的
     *
     * coordinator是协调文件写入的operator,就是负责计算哪些文件可以合并	      的
     *
     * compacter是压缩的operator
     */
    CompactFileWriter<T> writer = new CompactFileWriter<>(bucketCheckInterval, bucketsBuilder);
    ..........
    CompactCoordinator coordinator = new CompactCoordinator(fsSupplier, targetFileSize);
	..........
    CompactOperator<T> compacter = new CompactOperator<>(fsSupplier, readFactory, writerFactory);

}

3,HiveTableStreamSink压缩流程

3.1 CompactFileWriter

这个类就是将数据写入文件中~其本身没有实现如何写入,真正写入数据是在其父类中,但是当其父类提交了检查点之后,他会向下游发送一条写入结束的记录;

package org.apache.flink.table.filesystem.stream.compact;

import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.filesystem.stream.AbstractStreamingWriter;
import org.apache.flink.table.filesystem.stream.compact.CompactMessages.EndCheckpoint;
import org.apache.flink.table.filesystem.stream.compact.CompactMessages.InputFile;

/**
 * 该operator主要是继承了一个抽象类,重写了某些方法,
 * 而abstractStreamingWriter里面同时还包括写入数据的方法,在这里数据已经被写入bucket
 */
public class CompactFileWriter<T>
        extends AbstractStreamingWriter<T, CompactMessages.CoordinatorInput> {
	
    private static final long serialVersionUID = 1L;

    public CompactFileWriter(
            long bucketCheckInterval,
            StreamingFileSink.BucketsBuilder<
                            T, String, ? extends StreamingFileSink.BucketsBuilder<T, String, ?>>
                    bucketsBuilder) 
        super(bucketCheckInterval, bucketsBuilder);
    }

    @Override
    protected void partitionCreated(String partition) {}

    @Override
    protected void partitionInactive(String partition) {}

    @Override
    protected void onPartFileOpened(String partition, Path newPath) {
        //像下游发送通知,通知新的文件已经开始创建
        output.collect(new StreamRecord<>(new InputFile(partition, newPath)));
    }

    @Override
    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        super.notifyCheckpointComplete(checkpointId);
        //当检查点结束时,像下游发送检查点完成的消息
        output.collect(
                new StreamRecord<>(
                        new EndCheckpoint(
                                checkpointId,
                                getRuntimeContext().getIndexOfThisSubtask(),
                                getRuntimeContext().以上是关于Flink源码篇-FLINK的StreamingHive实现流程以及小文件压缩流程的主要内容,如果未能解决你的问题,请参考以下文章

Flink从入门到精通100篇(二十一)-万字长文详解 Flink 中的 CopyOnWriteStateTable

Flink从入门到精通100篇(二十四)-对Flink SQL Client 源码做深度解析

Apache Flink fault tolerance源码剖析完结篇

Apache Flink fault tolerance源码剖析

Flink SQL篇,SQL实操Flink HiveCEPCDCGateWay

Flink源码编译