Flink StreamingFileSink 处理流程

Posted springmoon-venn

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink StreamingFileSink 处理流程相关的知识,希望对你有一定的参考价值。

前两天试了下 Flink SQL 写 Hive,对 Sink 部分写数据到 HDFS 的部分比较疑惑,特别是基于 checkpoint 的文件提交,所以看了下 StreamingFileSink 的源码(Flink SQL 写 hive 复用了这部分代码)

StreamingFileSink 是 1.6 版本社区优化后推出的,为了替换 BucketingSink,BucketingSink 在 Flink 1.9 版本已经标记为 过期的

StreamingFileSink 的处理流程包含四级结构,依次往下调用:

* StreamingFIleSink 是 Sink,定义算子
* StreamingFileSinkHelper 是 sink 处理数据的工具类
* Buckets 对应所有桶
* Bucket 对应某一个桶

简单的 Demo

object StreamingFileDemo {

  def main(args: Array[String]): Unit = {
    // environment
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    //    env.enableCheckpointing(TimeUnit.MINUTES.toMillis(10))
    env.setParallelism(2)

    val source = env.addSource(new SimpleStringSource)

    val sink: StreamingFileSink[String] = StreamingFileSink
      // row-encoding
      .forRowFormat(new Path("/out/tmp"), new SimpleStringEncoder[String]("UTF-8"))
      // 桶分配器
      .withBucketAssigner(new DateTimeBucketAssigner[String]("yyyyMMdd", ZoneId.of("Asia/Shanghai")))
      // 方便hive 表直接加载对应目录作为hive 分区
      //.withBucketAssigner(new DateTimeBucketAssigner[String]("‘dt=‘yyyyMMdd", ZoneId.of("Asia/ShangHai")))
      // 滚动策略
      .withRollingPolicy(
      DefaultRollingPolicy.builder()
        // 2 minute 滚动一次
        .withRolloverInterval(TimeUnit.MINUTES.toMillis(2))
        // 2 minute 不活跃时间
        .withInactivityInterval(TimeUnit.MINUTES.toMillis(2))
        // 10mb
        .withMaxPartSize(10 * 1024 * 1024)
        .build())
      .build()

    source.addSink(sink)
    env.execute("StreamingFileDemo")

  }
}

写数据部分

跳过算子处理输入数据的通用逻辑,直接进入 StreamingFileSink

StreamingFileSink 初始化的时候调用 StreamingFileSink.initializeState 创建 Buckets 对象,作为参数传入 StreamingFileSinkHelper 构造方法中,创建 StreamingFileSinkHelper 对象

public void initializeState(FunctionInitializationContext context) throws Exception {
    this.helper = new StreamingFileSinkHelper<>(
        bucketsBuilder.createBuckets(getRuntimeContext().getIndexOfThisSubtask()),
        context.isRestored(),
        context.getOperatorStateStore(),
        ((StreamingRuntimeContext) getRuntimeContext()).getProcessingTimeService(),
        bucketCheckInterval);
  }

StreamingFileSink 是在 invoke 中处理输入数据

StreamingFileSink.invoke

@Override
  public void invoke(IN value, SinkFunction.Context context) throws Exception {
    this.helper.onElement(
      value,
      context.currentProcessingTime(),
      context.timestamp(),
      context.currentWatermark());
  }

从代码中可以看到,invoke 直接调用 StreamingFileSinkHelper.onElement 方法处理数据,同时传入 输入数据、当前处理时间、timestamp(数据时间,数据没有时间则为 null)、当前水印

StreamingFileSinkHelper

// 处理时间触发
@Override
public void onProcessingTime(long timestamp) throws Exception {
  final long currentTime = procTimeService.getCurrentProcessingTime();
  buckets.onProcessingTime(currentTime);
  procTimeService.registerTimer(currentTime + bucketCheckInterval, this);
}
// 输入数据触发
public void onElement(
    IN value,
    long currentProcessingTime,
    @Nullable Long elementTimestamp,
    long currentWatermark) throws Exception {
  buckets.onElement(value, currentProcessingTime, elementTimestamp, currentWatermark);
}

从代码可以看到, StreamingFileSinkHelper.onElement 方法也是直接调用 buckets.onElement 处理数据的,并且实现了 ProcessingTimeCallback 接口,使用定时器根据 bucketCheckInterval 定时调用 buckets.onProcessingTime (滚动 bucket ,可以看到是不支持事件时间滚动 bucket 的)

Buckets

@VisibleForTesting
  public Bucket<IN, BucketID> onElement(
      final IN value,
      final SinkFunction.Context context) throws Exception {
    return onElement(
        value,
        context.currentProcessingTime(),
        context.timestamp(),
        context.currentWatermark());
  }

  public Bucket<IN, BucketID> onElement(
      final IN value,
      final long currentProcessingTime,
      @Nullable final Long elementTimestamp,
      final long currentWatermark) throws Exception {
    // setting the values in the bucketer context
    bucketerContext.update(
        elementTimestamp,
        currentWatermark,
        currentProcessingTime);
    // 获取 bucket id
    final BucketID bucketId = bucketAssigner.getBucketId(value, bucketerContext);
    // 根据 bucketId get 或者创建 bucket
    final Bucket<IN, BucketID> bucket = getOrCreateBucketForBucketId(bucketId);
    // 写数据到 bucket
    bucket.write(value, currentProcessingTime);

    // we update the global max counter here because as buckets become inactive and
    // get removed from the list of active buckets, at the time when we want to create
    // another part file for the bucket, if we start from 0 we may overwrite previous parts.
    // part file 计数
    this.maxPartCounter = Math.max(maxPartCounter, bucket.getPartCounter());
    return bucket;
  }

  private Bucket<IN, BucketID> getOrCreateBucketForBucketId(final BucketID bucketId) throws IOException {
    // 从活跃 bucket 中取,null 就创建一个
    Bucket<IN, BucketID> bucket = activeBuckets.get(bucketId);
    if (bucket == null) {
      final Path bucketPath = assembleBucketPath(bucketId);
      bucket = bucketFactory.getNewBucket(
          subtaskIndex,
          bucketId,
          bucketPath,
          maxPartCounter,
          bucketWriter,
          rollingPolicy,
          outputFileConfig);
      activeBuckets.put(bucketId, bucket);
      notifyBucketCreate(bucket);
    }
    return bucket;
  }
  // 处理时间定时器调用
  public void onProcessingTime(long timestamp) throws Exception {
    // 调用活跃 bucket 的 onProcessTime 方法
    for (Bucket<IN, BucketID> bucket : activeBuckets.values()) {
      bucket.onProcessingTime(timestamp);
    }
  }

这里可以看到 数据数据进来,获取 bucketId, 获取 bucket 对象,并调用 bucket.write 写数据 (终于看到 write 数据了)

// 写数据
void write(IN element, long currentTime) throws IOException {
    // 根据 part 文件的 数据量判断是否需要滚动文件,对应 主程序设置的 10M
    if (inProgressPart == null || rollingPolicy.shouldRollOnEvent(inProgressPart, element)) {

      if (LOG.isDebugEnabled()) {
        LOG.debug("Subtask {} closing in-progress part file for bucket id={} due to element {}.",
            subtaskIndex, bucketId, element);
      }
      // 滚动文件
      rollPartFile(currentTime);
    }
    // 写数据到 inprocess part 
    inProgressPart.write(element, currentTime);
  }
  // 滚动 part 文件
  private void rollPartFile(final long currentTime) throws IOException {
    // 关闭当前 part 文件
    closePartFile();
    // 创建新的 part 文件 Path
    final Path partFilePath = assembleNewPartPath();
    // 打开 part Writer
    inProgressPart = bucketWriter.openNewInProgressFile(bucketId, partFilePath, currentTime);

    if (LOG.isDebugEnabled()) {
      LOG.debug("Subtask {} opening new part file "{}" for bucket id={}.",
          subtaskIndex, partFilePath.getName(), bucketId);
    }
    // part count ++
    partCounter++;
  }

  private Path assembleNewPartPath() {
    return new Path(bucketPath, outputFileConfig.getPartPrefix() + ‘-‘ + subtaskIndex + ‘-‘ + partCounter + outputFileConfig.getPartSuffix());
  }
  // 关闭 part 文件
  private InProgressFileWriter.PendingFileRecoverable closePartFile() throws IOException {
    InProgressFileWriter.PendingFileRecoverable pendingFileRecoverable = null;
    if (inProgressPart != null) {
      // close 文件
      pendingFileRecoverable = inProgressPart.closeForCommit();
      // 添加到 待 提交 列表
      pendingFileRecoverablesForCurrentCheckpoint.add(pendingFileRecoverable);
      inProgressPart = null;
    }
    return pendingFileRecoverable;
  
  // 处理时间 定时器调用,根据处理时间 滚动 part 文件
  void onProcessingTime(long timestamp) throws IOException {
    if (inProgressPart != null && rollingPolicy.shouldRollOnProcessingTime(inProgressPart, timestamp)) {
      if (LOG.isDebugEnabled()) {
        LOG.debug("Subtask {} closing in-progress part file for bucket id={} due to processing time rolling policy " +
            "(in-progress file created @ {}, last updated @ {} and current time is {}).",
            subtaskIndex, bucketId, inProgressPart.getCreationTime(), inProgressPart.getLastUpdateTime(), timestamp);
      }
      closePartFile();
    }
  

这里就是滚动文件和写数据和代码实现,真正的往文件系统写数据会根据执行环境的不同,调用不同的 Writer

本地: LocalRecoverableWriter
Hadoop文件系统: HadoopRecoverableWriter

Checkpoint snapshotState 部分

引用官网一段
```
IMPORTANT: Checkpointing needs to be enabled when using the StreamingFileSink. Part files can only be finalized on successful checkpoints.

If checkpointing is disabled part files will forever stay in `in-progress` or `pending` state and cannot be safely read by downstream systems.

在使用 StreamingFileSink 的时候,需要启动 checkpoint, Part 文件只有在 checkpoint 成功才能最终确定。

如果未启用 checkpoint part 文件会永远保持在 ‘in-progress‘ 或 ‘pending‘ 状态,下游系统不能安全的读取。

```

简单来说就是,必须开启 checkpoint,不然文件不能转换成完成状态(数据是完整的,文件状态不完成),所以 checkpoint 是必须的

checkpoint 过程, 从收到 CheckpointCoordinator 的 snapshot 消息开始,跳过 StreamingFileSink/StreamingFileSinkHelper 直接看 Buckets.snapshotState 部分

public void snapshotState(
      final long checkpointId,
      final ListState<byte[]> bucketStatesContainer,
      final ListState<Long> partCounterStateContainer) throws Exception {

    Preconditions.checkState(
      bucketWriter != null && bucketStateSerializer != null,
        "sink has not been initialized");

    LOG.info("Subtask {} checkpointing for checkpoint with id={} (max part counter={}).",
        subtaskIndex, checkpointId, maxPartCounter);

    bucketStatesContainer.clear();
    partCounterStateContainer.clear();

    snapshotActiveBuckets(checkpointId, bucketStatesContainer);
    partCounterStateContainer.add(maxPartCounter);
  }

  private void snapshotActiveBuckets(
      final long checkpointId,
      final ListState<byte[]> bucketStatesContainer) throws Exception {
    // 获取 活跃 bucket 的 status,放入 bucketStatesContainer
    for (Bucket<IN, BucketID> bucket : activeBuckets.values()) {
      final BucketState<BucketID> bucketState = bucket.onReceptionOfCheckpoint(checkpointId);

      final byte[] serializedBucketState = SimpleVersionedSerialization
          .writeVersionAndSerialize(bucketStateSerializer, bucketState);

      bucketStatesContainer.add(serializedBucketState);

      if (LOG.isDebugEnabled()) {
        LOG.debug("Subtask {} checkpointing: {}", subtaskIndex, bucketState);
      }
    }
  }

Bucket

BucketState<BucketID> onReceptionOfCheckpoint(long checkpointId) throws IOException {
    prepareBucketForCheckpointing(checkpointId);

    InProgressFileWriter.InProgressFileRecoverable inProgressFileRecoverable = null;
    long inProgressFileCreationTime = Long.MAX_VALUE;

    if (inProgressPart != null) {
      // 会 close 文件
      inProgressFileRecoverable = inProgressPart.persist();
      inProgressFileCreationTime = inProgressPart.getCreationTime();
      this.inProgressFileRecoverablesPerCheckpoint.put(checkpointId, inProgressFileRecoverable);
    }

    return new BucketState<>(bucketId, bucketPath, inProgressFileCreationTime, inProgressFileRecoverable, pendingFileRecoverablesPerCheckpoint);
  }

  private void prepareBucketForCheckpointing(long checkpointId) throws IOException {
    // 判断时间根据 checkpoint 滚动文件
    if (inProgressPart != null && rollingPolicy.shouldRollOnCheckpoint(inProgressPart)) {
      if (LOG.isDebugEnabled()) {
        LOG.debug("Subtask {} closing in-progress part file for bucket id={} on checkpoint.", subtaskIndex, bucketId);
      }
      closePartFile();
    }

    if (!pendingFileRecoverablesForCurrentCheckpoint.isEmpty()) {
      // 添加到 pendingFileRecoverablesPerCheckpoint 中
      pendingFileRecoverablesPerCheckpoint.put(checkpointId, pendingFileRecoverablesForCurrentCheckpoint);
      pendingFileRecoverablesForCurrentCheckpoint = new ArrayList<>();
    }
  }

将活跃桶的状态放入 bucketStatesContainer(ListState) 中

Checkpoint onCheckpointComplete

收到 收到 CheckpointCoordinator 的 complete 消息,跳过 StreamingFileSink/StreamingFileSinkHelper 直接看 Buckets.commitUpToCheckpoint 部分

  public void commitUpToCheckpoint(final long checkpointId) throws IOException {
    final Iterator<Map.Entry<BucketID, Bucket<IN, BucketID>>> activeBucketIt =
        activeBuckets.entrySet().iterator();

    LOG.info("Subtask {} received completion notification for checkpoint with id={}.", subtaskIndex, checkpointId);
    // 对 所有活跃 bucket 调用 bucket.onSuccessfulCompletionOfCheckpoint
    while (activeBucketIt.hasNext()) {
      final Bucket<IN, BucketID> bucket = activeBucketIt.next().getValue();
      bucket.onSuccessfulCompletionOfCheckpoint(checkpointId);

      if (!bucket.isActive()) {
        // We‘ve dealt with all the pending files and the writer for this bucket is not currently open.
        // Therefore this bucket is currently inactive and we can remove it from our state.
        activeBucketIt.remove();
        notifyBucketInactive(bucket);
      }
    }
  }

Bucket

void onSuccessfulCompletionOfCheckpoint(long checkpointId) throws IOException {
    checkNotNull(bucketWriter);

    Iterator<Map.Entry<Long, List<InProgressFileWriter.PendingFileRecoverable>>> it =
        pendingFileRecoverablesPerCheckpoint.headMap(checkpointId, true)
            .entrySet().iterator();

    while (it.hasNext()) {
      Map.Entry<Long, List<InProgressFileWriter.PendingFileRecoverable>> entry = it.next();
      // 遍历所有 pendingFileRecoverablesPerCheckpoint 中的 bucket
      for (InProgressFileWriter.PendingFileRecoverable pendingFileRecoverable : entry.getValue()) {
        // 恢复 pending 文件,提交,文件名会修改成 完成状态格式 (去掉 pending/in-process)
        bucketWriter.recoverPendingFile(pendingFileRecoverable).commit();
      }
      it.remove();
    }
    // 清理 当前 checkpoint
    cleanupInProgressFileRecoverables(checkpointId);
  }

  private void cleanupInProgressFileRecoverables(long checkpointId) throws IOException {
    Iterator<Map.Entry<Long, InProgressFileWriter.InProgressFileRecoverable>> it =
        inProgressFileRecoverablesPerCheckpoint.headMap(checkpointId, false)
            .entrySet().iterator();

    while (it.hasNext()) {
      final InProgressFileWriter.InProgressFileRecoverable inProgressFileRecoverable = it.next().getValue();

      // this check is redundant, as we only put entries in the inProgressFileRecoverablesPerCheckpoint map
      // list when the requiresCleanupOfInProgressFileRecoverableState() returns true, but having it makes
      // the code more readable.

      final boolean successfullyDeleted = bucketWriter.cleanupInProgressFileRecoverable(inProgressFileRecoverable);
      if (LOG.isDebugEnabled() && successfullyDeleted) {
        LOG.debug("Subtask {} successfully deleted incomplete part for bucket id={}.", subtaskIndex, bucketId);
      }
      it.remove();
    }
  }

 

bucketWriter.recoverPendingFile(pendingFileRecoverable).commit() 在不同的环境下游不同的表现:

 

本地: LocalRecoverableWriter  --> LocalCommitter  --> LocalCommitter.commit   本地直接修改文件名

Hadoop文件系统: HadoopRecoverableWriter  -->  HadoopFsCommitter  --> HadoopFsCommitter.commit   fs.rename 文件名

 

StreamingFlinkSink 处于一致性考虑在设计的时候就定义了必须开启 checkpoint ,但是有的任务其实是不太在意一致性的,可以接受部分数据的丢失,最重要的是不想开启 checkpoint(比如实时检测场景,只关心最近很短时间内的数据,如果任务失败了,直接拉起重新开始,而不是回到上次 checkpoint 时间点 )

 

直接在 Bucket.closePartFile 中提交文件,并且不放入 checkpoint 中,就可以在 滚动 part 文件的时候,提交文件

bucketWriter.recoverPendingFile(pendingFileRecoverable).commit();
// not add to checkpoint
// pendingFileRecoverablesForCurrentCheckpoint.add(pendingFileRecoverable);

欢迎关注Flink菜鸟公众号,会不定期更新Flink(开发技术)相关的推文

技术图片

     







以上是关于Flink StreamingFileSink 处理流程的主要内容,如果未能解决你的问题,请参考以下文章

Flink实战之StreamingFileSink如何写数据到其它HA的Hadoop集群

Flink输出算子Sink

2021年大数据Flink(四十八):扩展阅读  Streaming File Sink

flink exectly-once系列之两阶段提交概述

从0到1Flink的成长之路(二十)-扩展:Streaming File Sink

Flink基于EventTime和WaterMark处理乱序事件和晚到的数据