Flink StreamingFileSink 处理流程
Posted springmoon-venn
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink StreamingFileSink 处理流程相关的知识,希望对你有一定的参考价值。
前两天试了下 Flink SQL 写 Hive,对 Sink 部分写数据到 HDFS 的部分比较疑惑,特别是基于 checkpoint 的文件提交,所以看了下 StreamingFileSink 的源码(Flink SQL 写 hive 复用了这部分代码)
StreamingFileSink 是 1.6 版本社区优化后推出的,为了替换 BucketingSink,BucketingSink 在 Flink 1.9 版本已经标记为 过期的
StreamingFileSink 的处理流程包含四级结构,依次往下调用:
* StreamingFIleSink 是 Sink,定义算子
* StreamingFileSinkHelper 是 sink 处理数据的工具类
* Buckets 对应所有桶
* Bucket 对应某一个桶
简单的 Demo
object StreamingFileDemo { def main(args: Array[String]): Unit = { // environment val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment // env.enableCheckpointing(TimeUnit.MINUTES.toMillis(10)) env.setParallelism(2) val source = env.addSource(new SimpleStringSource) val sink: StreamingFileSink[String] = StreamingFileSink // row-encoding .forRowFormat(new Path("/out/tmp"), new SimpleStringEncoder[String]("UTF-8")) // 桶分配器 .withBucketAssigner(new DateTimeBucketAssigner[String]("yyyyMMdd", ZoneId.of("Asia/Shanghai"))) // 方便hive 表直接加载对应目录作为hive 分区 //.withBucketAssigner(new DateTimeBucketAssigner[String]("‘dt=‘yyyyMMdd", ZoneId.of("Asia/ShangHai"))) // 滚动策略 .withRollingPolicy( DefaultRollingPolicy.builder() // 2 minute 滚动一次 .withRolloverInterval(TimeUnit.MINUTES.toMillis(2)) // 2 minute 不活跃时间 .withInactivityInterval(TimeUnit.MINUTES.toMillis(2)) // 10mb .withMaxPartSize(10 * 1024 * 1024) .build()) .build() source.addSink(sink) env.execute("StreamingFileDemo") } }
写数据部分
跳过算子处理输入数据的通用逻辑,直接进入 StreamingFileSink
StreamingFileSink 初始化的时候调用 StreamingFileSink.initializeState 创建 Buckets 对象,作为参数传入 StreamingFileSinkHelper 构造方法中,创建 StreamingFileSinkHelper 对象
public void initializeState(FunctionInitializationContext context) throws Exception { this.helper = new StreamingFileSinkHelper<>( bucketsBuilder.createBuckets(getRuntimeContext().getIndexOfThisSubtask()), context.isRestored(), context.getOperatorStateStore(), ((StreamingRuntimeContext) getRuntimeContext()).getProcessingTimeService(), bucketCheckInterval); }
StreamingFileSink 是在 invoke 中处理输入数据
StreamingFileSink.invoke
@Override public void invoke(IN value, SinkFunction.Context context) throws Exception { this.helper.onElement( value, context.currentProcessingTime(), context.timestamp(), context.currentWatermark()); }
从代码中可以看到,invoke 直接调用 StreamingFileSinkHelper.onElement 方法处理数据,同时传入 输入数据、当前处理时间、timestamp(数据时间,数据没有时间则为 null)、当前水印
StreamingFileSinkHelper
// 处理时间触发 @Override public void onProcessingTime(long timestamp) throws Exception { final long currentTime = procTimeService.getCurrentProcessingTime(); buckets.onProcessingTime(currentTime); procTimeService.registerTimer(currentTime + bucketCheckInterval, this); } // 输入数据触发 public void onElement( IN value, long currentProcessingTime, @Nullable Long elementTimestamp, long currentWatermark) throws Exception { buckets.onElement(value, currentProcessingTime, elementTimestamp, currentWatermark); }
从代码可以看到, StreamingFileSinkHelper.onElement 方法也是直接调用 buckets.onElement 处理数据的,并且实现了 ProcessingTimeCallback 接口,使用定时器根据 bucketCheckInterval 定时调用 buckets.onProcessingTime (滚动 bucket ,可以看到是不支持事件时间滚动 bucket 的)
Buckets
@VisibleForTesting public Bucket<IN, BucketID> onElement( final IN value, final SinkFunction.Context context) throws Exception { return onElement( value, context.currentProcessingTime(), context.timestamp(), context.currentWatermark()); } public Bucket<IN, BucketID> onElement( final IN value, final long currentProcessingTime, @Nullable final Long elementTimestamp, final long currentWatermark) throws Exception { // setting the values in the bucketer context bucketerContext.update( elementTimestamp, currentWatermark, currentProcessingTime); // 获取 bucket id final BucketID bucketId = bucketAssigner.getBucketId(value, bucketerContext); // 根据 bucketId get 或者创建 bucket final Bucket<IN, BucketID> bucket = getOrCreateBucketForBucketId(bucketId); // 写数据到 bucket bucket.write(value, currentProcessingTime); // we update the global max counter here because as buckets become inactive and // get removed from the list of active buckets, at the time when we want to create // another part file for the bucket, if we start from 0 we may overwrite previous parts. // part file 计数 this.maxPartCounter = Math.max(maxPartCounter, bucket.getPartCounter()); return bucket; } private Bucket<IN, BucketID> getOrCreateBucketForBucketId(final BucketID bucketId) throws IOException { // 从活跃 bucket 中取,null 就创建一个 Bucket<IN, BucketID> bucket = activeBuckets.get(bucketId); if (bucket == null) { final Path bucketPath = assembleBucketPath(bucketId); bucket = bucketFactory.getNewBucket( subtaskIndex, bucketId, bucketPath, maxPartCounter, bucketWriter, rollingPolicy, outputFileConfig); activeBuckets.put(bucketId, bucket); notifyBucketCreate(bucket); } return bucket; } // 处理时间定时器调用 public void onProcessingTime(long timestamp) throws Exception { // 调用活跃 bucket 的 onProcessTime 方法 for (Bucket<IN, BucketID> bucket : activeBuckets.values()) { bucket.onProcessingTime(timestamp); } }
这里可以看到 数据数据进来,获取 bucketId, 获取 bucket 对象,并调用 bucket.write 写数据 (终于看到 write 数据了)
// 写数据 void write(IN element, long currentTime) throws IOException { // 根据 part 文件的 数据量判断是否需要滚动文件,对应 主程序设置的 10M if (inProgressPart == null || rollingPolicy.shouldRollOnEvent(inProgressPart, element)) { if (LOG.isDebugEnabled()) { LOG.debug("Subtask {} closing in-progress part file for bucket id={} due to element {}.", subtaskIndex, bucketId, element); } // 滚动文件 rollPartFile(currentTime); } // 写数据到 inprocess part inProgressPart.write(element, currentTime); } // 滚动 part 文件 private void rollPartFile(final long currentTime) throws IOException { // 关闭当前 part 文件 closePartFile(); // 创建新的 part 文件 Path final Path partFilePath = assembleNewPartPath(); // 打开 part Writer inProgressPart = bucketWriter.openNewInProgressFile(bucketId, partFilePath, currentTime); if (LOG.isDebugEnabled()) { LOG.debug("Subtask {} opening new part file "{}" for bucket id={}.", subtaskIndex, partFilePath.getName(), bucketId); } // part count ++ partCounter++; } private Path assembleNewPartPath() { return new Path(bucketPath, outputFileConfig.getPartPrefix() + ‘-‘ + subtaskIndex + ‘-‘ + partCounter + outputFileConfig.getPartSuffix()); } // 关闭 part 文件 private InProgressFileWriter.PendingFileRecoverable closePartFile() throws IOException { InProgressFileWriter.PendingFileRecoverable pendingFileRecoverable = null; if (inProgressPart != null) { // close 文件 pendingFileRecoverable = inProgressPart.closeForCommit(); // 添加到 待 提交 列表 pendingFileRecoverablesForCurrentCheckpoint.add(pendingFileRecoverable); inProgressPart = null; } return pendingFileRecoverable; // 处理时间 定时器调用,根据处理时间 滚动 part 文件 void onProcessingTime(long timestamp) throws IOException { if (inProgressPart != null && rollingPolicy.shouldRollOnProcessingTime(inProgressPart, timestamp)) { if (LOG.isDebugEnabled()) { LOG.debug("Subtask {} closing in-progress part file for bucket id={} due to processing time rolling policy " + "(in-progress file created @ {}, last updated @ {} and current time is {}).", subtaskIndex, bucketId, inProgressPart.getCreationTime(), inProgressPart.getLastUpdateTime(), timestamp); } closePartFile(); }
这里就是滚动文件和写数据和代码实现,真正的往文件系统写数据会根据执行环境的不同,调用不同的 Writer
本地: LocalRecoverableWriter
Hadoop文件系统: HadoopRecoverableWriter
Checkpoint snapshotState 部分
引用官网一段
```
IMPORTANT: Checkpointing needs to be enabled when using the StreamingFileSink. Part files can only be finalized on successful checkpoints.
If checkpointing is disabled part files will forever stay in `in-progress` or `pending` state and cannot be safely read by downstream systems.
在使用 StreamingFileSink 的时候,需要启动 checkpoint, Part 文件只有在 checkpoint 成功才能最终确定。
如果未启用 checkpoint part 文件会永远保持在 ‘in-progress‘ 或 ‘pending‘ 状态,下游系统不能安全的读取。
```
简单来说就是,必须开启 checkpoint,不然文件不能转换成完成状态(数据是完整的,文件状态不完成),所以 checkpoint 是必须的
checkpoint 过程, 从收到 CheckpointCoordinator 的 snapshot 消息开始,跳过 StreamingFileSink/StreamingFileSinkHelper 直接看 Buckets.snapshotState 部分
public void snapshotState( final long checkpointId, final ListState<byte[]> bucketStatesContainer, final ListState<Long> partCounterStateContainer) throws Exception { Preconditions.checkState( bucketWriter != null && bucketStateSerializer != null, "sink has not been initialized"); LOG.info("Subtask {} checkpointing for checkpoint with id={} (max part counter={}).", subtaskIndex, checkpointId, maxPartCounter); bucketStatesContainer.clear(); partCounterStateContainer.clear(); snapshotActiveBuckets(checkpointId, bucketStatesContainer); partCounterStateContainer.add(maxPartCounter); } private void snapshotActiveBuckets( final long checkpointId, final ListState<byte[]> bucketStatesContainer) throws Exception { // 获取 活跃 bucket 的 status,放入 bucketStatesContainer for (Bucket<IN, BucketID> bucket : activeBuckets.values()) { final BucketState<BucketID> bucketState = bucket.onReceptionOfCheckpoint(checkpointId); final byte[] serializedBucketState = SimpleVersionedSerialization .writeVersionAndSerialize(bucketStateSerializer, bucketState); bucketStatesContainer.add(serializedBucketState); if (LOG.isDebugEnabled()) { LOG.debug("Subtask {} checkpointing: {}", subtaskIndex, bucketState); } } }
Bucket
BucketState<BucketID> onReceptionOfCheckpoint(long checkpointId) throws IOException { prepareBucketForCheckpointing(checkpointId); InProgressFileWriter.InProgressFileRecoverable inProgressFileRecoverable = null; long inProgressFileCreationTime = Long.MAX_VALUE; if (inProgressPart != null) { // 会 close 文件 inProgressFileRecoverable = inProgressPart.persist(); inProgressFileCreationTime = inProgressPart.getCreationTime(); this.inProgressFileRecoverablesPerCheckpoint.put(checkpointId, inProgressFileRecoverable); } return new BucketState<>(bucketId, bucketPath, inProgressFileCreationTime, inProgressFileRecoverable, pendingFileRecoverablesPerCheckpoint); } private void prepareBucketForCheckpointing(long checkpointId) throws IOException { // 判断时间根据 checkpoint 滚动文件 if (inProgressPart != null && rollingPolicy.shouldRollOnCheckpoint(inProgressPart)) { if (LOG.isDebugEnabled()) { LOG.debug("Subtask {} closing in-progress part file for bucket id={} on checkpoint.", subtaskIndex, bucketId); } closePartFile(); } if (!pendingFileRecoverablesForCurrentCheckpoint.isEmpty()) { // 添加到 pendingFileRecoverablesPerCheckpoint 中 pendingFileRecoverablesPerCheckpoint.put(checkpointId, pendingFileRecoverablesForCurrentCheckpoint); pendingFileRecoverablesForCurrentCheckpoint = new ArrayList<>(); } }
将活跃桶的状态放入 bucketStatesContainer(ListState) 中
Checkpoint onCheckpointComplete
收到 收到 CheckpointCoordinator 的 complete 消息,跳过 StreamingFileSink/StreamingFileSinkHelper 直接看 Buckets.commitUpToCheckpoint 部分
public void commitUpToCheckpoint(final long checkpointId) throws IOException { final Iterator<Map.Entry<BucketID, Bucket<IN, BucketID>>> activeBucketIt = activeBuckets.entrySet().iterator(); LOG.info("Subtask {} received completion notification for checkpoint with id={}.", subtaskIndex, checkpointId); // 对 所有活跃 bucket 调用 bucket.onSuccessfulCompletionOfCheckpoint while (activeBucketIt.hasNext()) { final Bucket<IN, BucketID> bucket = activeBucketIt.next().getValue(); bucket.onSuccessfulCompletionOfCheckpoint(checkpointId); if (!bucket.isActive()) { // We‘ve dealt with all the pending files and the writer for this bucket is not currently open. // Therefore this bucket is currently inactive and we can remove it from our state. activeBucketIt.remove(); notifyBucketInactive(bucket); } } }
Bucket
void onSuccessfulCompletionOfCheckpoint(long checkpointId) throws IOException { checkNotNull(bucketWriter); Iterator<Map.Entry<Long, List<InProgressFileWriter.PendingFileRecoverable>>> it = pendingFileRecoverablesPerCheckpoint.headMap(checkpointId, true) .entrySet().iterator(); while (it.hasNext()) { Map.Entry<Long, List<InProgressFileWriter.PendingFileRecoverable>> entry = it.next(); // 遍历所有 pendingFileRecoverablesPerCheckpoint 中的 bucket for (InProgressFileWriter.PendingFileRecoverable pendingFileRecoverable : entry.getValue()) { // 恢复 pending 文件,提交,文件名会修改成 完成状态格式 (去掉 pending/in-process) bucketWriter.recoverPendingFile(pendingFileRecoverable).commit(); } it.remove(); } // 清理 当前 checkpoint cleanupInProgressFileRecoverables(checkpointId); } private void cleanupInProgressFileRecoverables(long checkpointId) throws IOException { Iterator<Map.Entry<Long, InProgressFileWriter.InProgressFileRecoverable>> it = inProgressFileRecoverablesPerCheckpoint.headMap(checkpointId, false) .entrySet().iterator(); while (it.hasNext()) { final InProgressFileWriter.InProgressFileRecoverable inProgressFileRecoverable = it.next().getValue(); // this check is redundant, as we only put entries in the inProgressFileRecoverablesPerCheckpoint map // list when the requiresCleanupOfInProgressFileRecoverableState() returns true, but having it makes // the code more readable. final boolean successfullyDeleted = bucketWriter.cleanupInProgressFileRecoverable(inProgressFileRecoverable); if (LOG.isDebugEnabled() && successfullyDeleted) { LOG.debug("Subtask {} successfully deleted incomplete part for bucket id={}.", subtaskIndex, bucketId); } it.remove(); } }
bucketWriter.recoverPendingFile(pendingFileRecoverable).commit() 在不同的环境下游不同的表现:
本地: LocalRecoverableWriter --> LocalCommitter --> LocalCommitter.commit 本地直接修改文件名
Hadoop文件系统: HadoopRecoverableWriter --> HadoopFsCommitter --> HadoopFsCommitter.commit fs.rename 文件名
StreamingFlinkSink 处于一致性考虑在设计的时候就定义了必须开启 checkpoint ,但是有的任务其实是不太在意一致性的,可以接受部分数据的丢失,最重要的是不想开启 checkpoint(比如实时检测场景,只关心最近很短时间内的数据,如果任务失败了,直接拉起重新开始,而不是回到上次 checkpoint 时间点 )
直接在 Bucket.closePartFile 中提交文件,并且不放入 checkpoint 中,就可以在 滚动 part 文件的时候,提交文件
bucketWriter.recoverPendingFile(pendingFileRecoverable).commit(); // not add to checkpoint // pendingFileRecoverablesForCurrentCheckpoint.add(pendingFileRecoverable);
欢迎关注Flink菜鸟公众号,会不定期更新Flink(开发技术)相关的推文
以上是关于Flink StreamingFileSink 处理流程的主要内容,如果未能解决你的问题,请参考以下文章
Flink实战之StreamingFileSink如何写数据到其它HA的Hadoop集群
2021年大数据Flink(四十八):扩展阅读 Streaming File Sink