Flink源码篇-FLINK的StreamingHive实现流程以及小文件压缩流程
Posted 黑猫~
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink源码篇-FLINK的StreamingHive实现流程以及小文件压缩流程相关的知识,希望对你有一定的参考价值。
Flink的HiveStreamingSink实现流程
前言
目前我们为了增强数据的时效性,增加了Flink实时写入Hive的流程,基于Flink写入Hive这里之前基本上是没有接触过的,看了官网的文章之后,由于我们的追求数据为1-10分钟内可见性,但是数据也不足1分钟就能达到128MB的情况,于是也会产生各种各样的十几MB的小文件,于是需要了解一下这个写入流程基于上面进行改造,使小文件能够达到自动合并的效果,顺便记录一下FlinkStreamingHive的流程
文章目录
1,HiveTableSink初始化校验流程
1.1创建TableSink对象
public HiveTableSink(
ReadableConfig flinkConf,
JobConf jobConf,
ObjectIdentifier identifier,
CatalogTable table,
@Nullable Integer configuredParallelism) {
//构造方法传入参数
this.flinkConf = flinkConf;
this.jobConf = jobConf;
//这个标识符是一个catalog,db和tbObjName的标识
this.identifier = identifier;
//CataLog中表的信息
this.catalogTable = table;
//HiveVersion这里很重要主要是为了根据不同的版本适配不同的实现方法,没有写入会获取你metastore默认的version
hiveVersion =
Preconditions.checkNotNull(
jobConf.get(HiveCatalogFactoryOptions.HIVE_VERSION.key()),
"Hive version is not defined");
//hiveShim就是适配不同版本的工具类
hiveShim = HiveShimLoader.loadHiveShim(hiveVersion);
//获取表结构
tableSchema = TableSchemaUtils.getPhysicalSchema(table.getSchema());
//获取配置的Parallelism,这里是在工厂类里获取传入的
this.configuredParallelism = configuredParallelism;
}
1.2返回SinkRunTimeProvider
@Override
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
//数据结构映射器,映射Pojo为DataRow等操作...
DataStructureConverter converter =
context.createDataStructureConverter(tableSchema.toRowDataType());
//这里就是返回一个DataStreamSinkProvider,用于在启动执行时提供Sink
return (DataStreamSinkProvider)
dataStream -> consume(dataStream, context.isBounded(), converter);
}
private DataStreamSink<?> consume(DataStream<RowData> dataStream, boolean isBounded, DataStructureConverter converter) {
//检查是否是Hive的ACID表,在Hive高版本以上提供了ACID的概念,2.x以及3.x对acid支持的更好,Flink目前是不支持写入Hive的ACID表的
checkAcidTable(catalogTable, identifier.toObjectPath());
//try with resource 好处就是无论是否exception都会帮你close connector
//flink 包装了一层主要就是根据版本来创建metastore client,继承了AutoCloseable帮助自动释放
try (HiveMetastoreClientWrapper client = HiveMetastoreClientFactory.create(HiveConfUtils.create(jobConf), hiveVersion)) {
//通过identifier获取Table对象;
Table table = client.getTable(identifier.getDatabaseName(), identifier.getObjectName());
//这里是获取表存储描述,里面包含表的一些元数据描述
StorageDescriptor sd = table.getSd();
//获取Hive输出结构Class
Class hiveOutputFormatClz =hiveShim.getHiveOutputFormatClass(Class.forName(sd.getOutputFormat()));
//文件是否进行存储压缩,text不压缩,如果store以orc or parquet即为true
boolean isCompressed = jobConf.getBoolean(HiveConf.ConfVars.COMPRESSRESULT.varname, false);
//对Hive操作的一个工厂,可以用来创建记录写入到Hive的写入器
HiveWriterFactory writerFactory =
new HiveWriterFactory(
jobConf,
hiveOutputFormatClz,
sd.getSerdeInfo(),
tableSchema,
getPartitionKeyArray(),
HiveReflectionUtils.getTableMetadata(hiveShim, table),
hiveShim,
isCompressed);
//获取文件的扩展名,如果Table的Store是orc or parquet的话则是没有扩展名的
String extension =
Utilities.getFileExtension(
jobConf,
isCompressed,
(HiveOutputFormat<?, ?>) hiveOutputFormatClz.newInstance());
//Flink输出的一个小配置,其中主要包含文件的前缀和后缀
//后缀是基于上面获取的,而前缀则是part-随机字符创
//比如说:part-e9ebbc0c-ae29-4ac7-8c84-f80daf385915-0-413
//这里的前缀并不是最终的文件名称,当你开启了压缩之后还会在前面添加内容的
OutputFileConfig.OutputFileConfigBuilder fileNamingBuilder =
OutputFileConfig.builder()
.withPartPrefix("part-" + UUID.randomUUID().toString())
.withPartSuffix(extension == null ? "" : extension);
//获取parallelism,主要是为了后面使用.
final int parallelism =
Optional.ofNullable(configuredParallelism).orElse(dataStream.getParallelism());
//到这里针对于Sink的初始化基本已经结束了,接下来我们需要判断本次执行是Stream还是batch
if (isBounded) {
//如果是Batch的话输出文件名是不同的,需要注意一下这个
OutputFileConfig fileNaming = fileNamingBuilder.build();
return createBatchSink(
dataStream, converter, sd, writerFactory, fileNaming, parallelism);
} else {
//如果是Stream的话,首先肯定是不支持overwrite的,如果是overwrite的话,直接exception
if (overwrite) {
throw new IllegalStateException("Streaming mode not support overwrite.");
}
//获取一下Hive表的配置
Properties tableProps = HiveReflectionUtils.getTableMetadata(hiveShim, table);
//完成Hive的Sink创建
return createStreamSink(
dataStream, sd, tableProps, writerFactory, fileNamingBuilder, parallelism);
}
} catch (TException e) {
//异常这里也注意一下啦,后续排错方便~
throw new CatalogException("Failed to query Hive metaStore", e);
} catch (IOException e) {
throw new FlinkRuntimeException("Failed to create staging dir", e);
} catch (ClassNotFoundException e) {
throw new FlinkHiveException("Failed to get output format class", e);
} catch (IllegalAccessException | InstantiationException e) {
throw new FlinkHiveException("Failed to instantiate output format instance", e);
}
}
基于这里,我们针对于Flink的HiveTableSink初始化就基本了解的差不多完成了,说实话一个顶级的框架代码规范以及异常处理都是非常吊的,学框架的基本就是了解思想,其次要去看看别人怎么写代码,可以收获特别多,非常值得我们学习;
2,HiveTableStreamSink创建
2.1 StreamSink的创建
private DataStreamSink<?> createStreamSink( //说实话我有强迫症,这样看参数我好难受
DataStream<RowData> dataStream, //数据流,不用多说
StorageDescriptor sd,//table storage description 里面包含一些描述
Properties tableProps, //table properties 表的配置就是你创建表的pros
HiveWriterFactory recordWriterFactory, //记录写出工厂
OutputFileConfig.OutputFileConfigBuilder fileNamingBuilder,//写出文件配置
final int parallelism //这里是subtask数量
) {
//创建一个Flink Configuration对象
org.apache.flink.configuration.Configuration conf = new org.apache.flink.configuration.Configuration();
//然后将表的配置信息写入
catalogTable.getOptions().forEach(conf::setString);
//数据分区计算器~也就是hive的数据存储分区啦
HiveRowDataPartitionComputer partComputer =
new HiveRowDataPartitionComputer(
hiveShim,
defaultPartName(),
tableSchema.getFieldNames(),
tableSchema.getFieldDataTypes(),
getPartitionKeyArray());
//数据表bucket,根据partComputer来区分bucket
TableBucketAssigner assigner = new TableBucketAssigner(partComputer);
//数据文件滚动策略,这个是最早的文件处理机制,考虑到多分区的情况会产生小文件从而有了compress机制
HiveRollingPolicy rollingPolicy =
new HiveRollingPolicy(
conf.get(SINK_ROLLING_POLICY_FILE_SIZE).getBytes(),
conf.get(SINK_ROLLING_POLICY_ROLLOVER_INTERVAL).toMillis());
//是否开启分区文件压缩,分区文件压缩这里说明一下
//为什么有了滚动策略还有这个压缩,比如你parallelism是5 那就是会创建5个小文件...自己想想吧
boolean autoCompaction = conf.getBoolean(FileSystemOptions.AUTO_COMPACTION);
//如果开启了之后所有写入的文件(没合并之前)都是uncompaction前缀的标识~
if (autoCompaction) {
fileNamingBuilder.withPartPrefix(
convertToUncompacted(fileNamingBuilder.build().getPartPrefix()));
}
//然后获取文件名配置
OutputFileConfig outputFileConfig = fileNamingBuilder.build();
//获取path对象,这里的path对象是指表的存储路径,并不是某个文件的绝对路径,是表在HDFS的绝对路径
org.apache.flink.core.fs.Path path = new org.apache.flink.core.fs.Path(sd.getLocation());
//BucketBuilder,
BucketsBuilder<RowData, String, ? extends BucketsBuilder<RowData, ?, ?>> builder;
//判断是否是MR的还是FLINK本身的
if (flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER)) {
builder =
bucketsBuilderForMRWriter(
recordWriterFactory, sd, assigner, rollingPolicy, outputFileConfig);
LOG.info("Hive streaming sink: Use MapReduce RecordWriter writer.");
} else {
Optional<BulkWriter.Factory<RowData>> bulkFactory =
createBulkWriterFactory(getPartitionKeyArray(), sd);
//根据不同的格式创建的bulkfactory,如果不存在则默认创建hadoop mr的
if (bulkFactory.isPresent()) {
builder =
StreamingFileSink.forBulkFormat(
path,
new FileSystemTableSink.ProjectionBulkFactory(
bulkFactory.get(), partComputer))
.withBucketAssigner(assigner)
.withRollingPolicy(rollingPolicy)
.withOutputFileConfig(outputFileConfig);
LOG.info("Hive streaming sink: Use native parquet&orc writer.");
} else {
builder =
bucketsBuilderForMRWriter(
recordWriterFactory, sd, assigner, rollingPolicy, outputFileConfig);
LOG.info(
"Hive streaming sink: Use MapReduce RecordWriter writer because BulkWriter Factory not available.");
}
}
//bucket检查间隔,是建表是写在tblPro的参数值,详情见官网FileSystemSink
long bucketCheckInterval = conf.get(SINK_ROLLING_POLICY_CHECK_INTERVAL).toMillis();
//输出流,这个sink只是将record写出到文件,并不是最终的operator
DataStream<PartitionCommitInfo> writerStream;
//判断是否开启压缩,是建表是写在tblPro的参数值
if (autoCompaction) {
//文件压缩的大小,这里我们要注意一下如果你不配置的话默认值就是你的SINK_ROLLING_POLICY_FILE_SIZE~
long compactionSize =
conf.getOptional(FileSystemOptions.COMPACTION_FILE_SIZE)
.orElse(conf.get(SINK_ROLLING_POLICY_FILE_SIZE))
.getBytes();
//创建输出流,通过StreamingSink对象创建
writerStream =
StreamingSink.compactionWriter(
dataStream,
bucketCheckInterval,
builder,
fsFactory(),
path,
createCompactReaderFactory(sd, tableProps),
compactionSize,
parallelism);
} else {
writerStream =
StreamingSink.writer(dataStream, bucketCheckInterval, builder, parallelism);
}
//Sink就是挂载了Sink了这里先不急
return StreamingSink.sink(
writerStream, path, identifier, getPartitionKeys(), msFactory(), fsFactory(), conf);
}
到这里StreamSink就挂载结束了,但是其真正的实现我们目前并没有看到,真正实现,其实实现是在compactionWriter中实现的,我们可以看一下这个内容
public static <T> DataStream<PartitionCommitInfo> compactionWriter(//晕~强迫症已经犯了
DataStream<T> inputStream, //输入流,比如 rowData,String,Struct等
long bucketCheckInterval,//检查间隔
StreamingFileSink.BucketsBuilder<
T, String, ? extends StreamingFileSink.BucketsBuilder<T, String, ?>>
bucketsBuilder,//bucker建造者
FileSystemFactory fsFactory,//文件系统工厂
Path path,//路径
CompactReader.Factory<T> readFactory,//合并读取工厂,这里应该很好理解吧~
long targetFileSize,//目标文件大小
int parallelism //分区数
) {
/**
* 这个类里实现了三个算子~我们来看一下这是哪个算子的用户
* writer是用来写入数据到buckt,并且像下游发送openfile or checkpoint success message操作的
*
* coordinator是协调文件写入的operator,就是负责计算哪些文件可以合并 的
*
* compacter是压缩的operator
*/
CompactFileWriter<T> writer = new CompactFileWriter<>(bucketCheckInterval, bucketsBuilder);
..........
CompactCoordinator coordinator = new CompactCoordinator(fsSupplier, targetFileSize);
..........
CompactOperator<T> compacter = new CompactOperator<>(fsSupplier, readFactory, writerFactory);
}
3,HiveTableStreamSink压缩流程
3.1 CompactFileWriter
这个类就是将数据写入文件中~其本身没有实现如何写入,真正写入数据是在其父类中,但是当其父类提交了检查点之后,他会向下游发送一条写入结束的记录;
package org.apache.flink.table.filesystem.stream.compact;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.filesystem.stream.AbstractStreamingWriter;
import org.apache.flink.table.filesystem.stream.compact.CompactMessages.EndCheckpoint;
import org.apache.flink.table.filesystem.stream.compact.CompactMessages.InputFile;
/**
* 该operator主要是继承了一个抽象类,重写了某些方法,
* 而abstractStreamingWriter里面同时还包括写入数据的方法,在这里数据已经被写入bucket
*/
public class CompactFileWriter<T>
extends AbstractStreamingWriter<T, CompactMessages.CoordinatorInput> {
private static final long serialVersionUID = 1L;
public CompactFileWriter(
long bucketCheckInterval,
StreamingFileSink.BucketsBuilder<
T, String, ? extends StreamingFileSink.BucketsBuilder<T, String, ?>>
bucketsBuilder)
super(bucketCheckInterval, bucketsBuilder);
}
@Override
protected void partitionCreated(String partition) {}
@Override
protected void partitionInactive(String partition) {}
@Override
protected void onPartFileOpened(String partition, Path newPath) {
//像下游发送通知,通知新的文件已经开始创建
output.collect(new StreamRecord<>(new InputFile(partition, newPath)));
}
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
super.notifyCheckpointComplete(checkpointId);
//当检查点结束时,像下游发送检查点完成的消息
output.collect(
new StreamRecord<>(
new EndCheckpoint(
checkpointId,
getRuntimeContext().getIndexOfThisSubtask(),
getRuntimeContext().以上是关于Flink源码篇-FLINK的StreamingHive实现流程以及小文件压缩流程的主要内容,如果未能解决你的问题,请参考以下文章
Flink从入门到精通100篇(二十一)-万字长文详解 Flink 中的 CopyOnWriteStateTable
Flink从入门到精通100篇(二十四)-对Flink SQL Client 源码做深度解析
Apache Flink fault tolerance源码剖析完结篇
Apache Flink fault tolerance源码剖析