FlinkFlink Streaming File Sink

Posted 九师兄

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了FlinkFlink Streaming File Sink相关的知识,希望对你有一定的参考价值。

1.概述

类似文章:【Flink】Flink StreamingFileSink

转载:Flink Streaming File Sink

2.背景

Flink 支持将流数据以文件形式存储到外部系统,典型使用场景是将数据写入Hive表所在 HDFS存储路径,通过Hive 做查询分析。随着Flink文件写入被业务广泛使用,暴露出很多问题,因此需要了解 Flink Streaming File sink 的实现逻辑。

3.案例

从Kafka消费JSON数据,转换为 UserInfo 实体类数据流,最终以Parquet 格式写入Hive表对应的HDFS路径。使用 Flink 1.12.1,Hadoop 2.8.0, hive 2.3.8。

-----------------------------
- hive 建表语句
-----------------------------

create table userinfo(
    userid int,
    username string
) stored as parquet;

-----------------------------
- java 实体类
-----------------------------
public class UserInfo 
    private int userId;
    private String userName;

    public int getUserId() 
        return userId;
    

    public void setUserId(int userId) 
        this.userId = userId;
    

    public String getUserName() 
        return userName;
    

    public void setUserName(String userName) 
        this.userName = userName;
    


    @Override
    public String toString() 
        return "UserInfo" +
                "userId=" + userId +
                ", userName='" + userName + '\\'' +
                '';
    


-----------------------------
- Flink 文件写入程序
-----------------------------
public class Kafka2Parquet 

    public static void main(String[] args) throws Exception 
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "localhost:9092");
        props.setProperty("group.id", "test_001");

        env.setParallelism(1);
        env.enableCheckpointing(30000);
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,2000));
        env.setStateBackend(new FsStateBackend("hdfs://localhost:9000/user/todd/checkpoint"));

        FlinkKafkaConsumer<String> dataStream = new FlinkKafkaConsumer("mqTest02", new SimpleStringSchema(), props);
        dataStream.setStartFromLatest();

        DataStream<UserInfo> userInfoDataStream = env.addSource(dataStream)
                .map(value -> JsonUtils.parseJson(value, UserInfo.class));
        
        // 1. 设置BulkFormat Builder  2.使用 CheckpointRollingPolicy
        StreamingFileSink<UserInfo> parquetSink = StreamingFileSink
                .forBulkFormat(new Path("hdfs://localhost:9000/user/hive/warehouse/userinfo"),  ParquetAvroWriters.forReflectRecord(UserInfo.class))
                .withRollingPolicy(OnCheckpointRollingPolicy.build())
                .build();

        userInfoDataStream.addSink(parquetSink);

        env.execute();
    


-----------------------------
- 生成的parquet 文件名称及路径
-----------------------------
inprogress 临时文件
hdfs://localhost:9000/user/hive/warehouse/userinfo/2021-08-09--19/.part-0-0.inprogress.18296793-9fde-4376-b6fc-7c47512bd108

part 最终文件
hdfs://localhost:9000/user/hive/warehouse/userinfo/2021-08-09--19/part-0-0


4. 核心类

  1. BulkWriter:用于不同格式的数据文件批量写入,主要实现类ParquetBulkWriter、AvroBulkWriter、OrcBulkWriter、SequenceFileWriter代表了数据写入的压缩格式。

  2. RecoverableWriter: 具有失败恢复能力的外部文件系统写入器,主要实现类HadoopRecoverableWriter、S3RecoverableWriter、LocalRecoverableWriter,代表对不同类型文件系统的操作。

  3. BulkPartWriter:InProgressFileWriter 实现类用来向inprogress文件写数据,持有BulkWriter。

  4. OutputStreamBasedPartFileWriter:Part Writer 基类,使用 RecoverableFsDataOutputStream 写出数据。

  5. RecoverableFsDataOutputStream:文件系统输出流,能够从文件系统的指定偏移量进行数据写入。

  6. BucketAssigner:负责将数据划分到不同的Bucket,可以根据数据格式自定义Assigner。内部集成 SimpleVersionedSerializer,用来对BucketID 做序列化/反序列化操作。
    a. BucketAssigner 子类 DateTimeBucketAssigner 根据数据的ProcessTime 生成 yyyy-MM-dd–HH 格式的 Bucket 名称。同时使用 SimpleVersionedStringSerializer 对Bucket 名称序列化。

  7. Bucket:StreamingFileSink数据输出的目录,每一条处理的数据根据BucketAssigner被分配到某个Bucket。主要功能:

1. 维护一份 InProgressFile 文件,负责该文件的创建、数据写入、
   提交写入。
2.StreamingFileSink 执行Checkpoint时,负责构建 BucketState 
   进而进行该状态序列化 。
3.StreamingFileSink Checkpoint 完成后,重命名 InProgressFile 文件,

4.StreamingFileSink 从savepoint 启动时,从 BucketState 
    恢复 InProgressFile 相关信息。
  1. BucketState: Bucket的状态信息。通过BucketState能够恢复Bucket inprogress 文件及当前写入偏移量,从而继续向该 inprogress文件中追加内容,同时能够恢复 Pending状态文件信息,从而继续执行后续重名逻辑。

  2. Buckets:负责管理 StreamingFileSink 中所有活跃状态的 Bucket。包括数据所在Bucket 分配,Active Bucket 快照状态存储。

  3. RollingPolicy:定义了buckt 生成新的in-progress文件、及将in-progress 文件变更为最终part文件的策略。 最常用的策略是CheckpointRollingPolicy,在每次Checkpoint完成时,根据in-progress文件生成part文件。

  4. StreamingFileSinkHelper:StreamingFileSink 调用 StreamingFileSinkHelper 方法完成对Buckets数据的写入及状态存储。

  5. StreamingFileSink:根据BucketsBuilder构造器创建Buckets,初始化时创建StreamingFileSinkHelper,在Sink、checkpoint 方法中调用 StreamingFileSinkHelper 接口。

5.数据写入

Flink写文件流程为,先将数据写入inprogress临时文件,在满足RollingPolicy时,将inprogress临时文件重命名为最终的part文件。
参考Flink1.12.1版本的代码,学习下 Flink 将数据写入文件的具体流程。


StreamingFileSink 执行 invoke() 方法处理数据,是通过调用 StreamingFileSinkHelper onElement()方法对 Buckets 进行操作。

functions.sink.filesystem.Buckets#onElement

public Bucket<IN, BucketID> onElement(
        final IN value,
        final long currentProcessingTime,
        @Nullable final Long elementTimestamp,
        final long currentWatermark)
        throws Exception 
    // note: 获取当前数据所在的 BucketID, 即被被分桶后的子文件夹名称
    final BucketID bucketId = bucketAssigner.getBucketId(value, bucketerContext);
    // note: 从已缓存的集合中获取Bucket 或者 新建Bucket并缓存
    final Bucket<IN, BucketID> bucket = getOrCreateBucketForBucketId(bucketId);
    // note: 将数据写入具体 bucket
    bucket.write(value, currentProcessingTime);
    return bucket;

Buckets 处理数据时,需要根据定义的 BucketAssigner 获取数据所在的 Bucket 标识。上述案例使用了 DateTimeBucketAssigner 了解下它如何根据ProcessTime 获取 BucketID。

bucketassigners.DateTimeBucketAssigner#getBucketId

public String getBucketId(IN element, BucketAssigner.Context context) 
    if (dateTimeFormatter == null) 
        // note: 将Processing Time 转换为 yyyy-MM-dd--HH 格式
        dateTimeFormatter = DateTimeFormatter.ofPattern(formatString).withZone(zoneId);
    
    return dateTimeFormatter.format(Instant.ofEpochMilli(context.currentProcessingTime()));

根据 BucketID 从Buckets 中拿到有效的Bucket。

filesystem.Buckets#getOrCreateBucketForBucketId  

private Bucket<IN, BucketID> getOrCreateBucketForBucketId(final BucketID bucketId)
        throws IOException 
    Bucket<IN, BucketID> bucket = activeBuckets.get(bucketId);
    if (bucket == null) 
        // note: 构建buckt所在完整路径,例如 hdfs://localhost:9000/user/hive/warehouse/userinfo/2020-08-08--14
        final Path bucketPath = assembleBucketPath(bucketId);
        // note: 创建 Bucket 并由activeBuckets缓存
        bucket =
                bucketFactory.getNewBucket(
                        subtaskIndex,
                        bucketId,
                        bucketPath,
                        maxPartCounter,
                        bucketWriter,
                        rollingPolicy,
                        fileLifeCycleListener,
                        outputFileConfig);
        activeBuckets.put(bucketId, bucket);

        notifyBucketCreate(bucket);
    
    return bucket;

初次向Bucket写入数据,需要创建part的临时文件及用来向文件写数据的InProgressFileWriter对象,同时创建BulkWriter,用来进行数据写入。

  1. 创建临时文件:根据inprocess文件生成规则,在HadoopRecoverableFsDataOutputStream初始化时创建并返回针对该文件的DataOutputStream。
  2. 创建BulkWriter:当前案例中由ParquetWriterFactory工厂类创建ParquetBulkWriter,并传递临时文件对应的DataOutputStream。
sinkfilesystem.Bucket#write 
void write(IN element, long currentTime) throws IOException 
    if (inProgressPart == null || rollingPolicy.shouldRollOnEvent(inProgressPart, element)) 
        inProgressPart = rollPartFile(currentTime);
    
    //note: BulkPartWriter使用 BulkWriter 写入数据。
    inProgressPart.write(element, currentTime);


private InProgressFileWriter<IN, BucketID> rollPartFile(final long currentTime)
        throws IOException 
    // note: 关闭part文件。
    closePartFile();
    final Path partFilePath = assembleNewPartPath();
    // note: 创建InProgressFileWriter
    return bucketWriter.openNewInProgressFile(bucketId, partFilePath, currentTime);


// 创建BulkPartWriter 使用BulkWriter进行数据写入,
OutputStreamBasedBucketWriter#openNewInProgressFile
public InProgressFileWriter<IN, BucketID> openNewInProgressFile(
        final BucketID bucketID, final Path path, final long creationTime)
        throws IOException 
    // note: 根据的inprocess文件路径,由recoverableWriter创建。
    return openNew(bucketID, recoverableWriter.open(path), path, creationTime);

public InProgressFileWriter<IN, BucketID> openNew(
         final BucketID bucketId,
         final RecoverableFsDataOutputStream stream,
         final Path path,
         final long creationTime)
         throws IOException 

     final BulkWriter<IN> writer = writerFactory.create(stream);
     return new BulkPartWriter<>(bucketId, stream, writer, creationTime);
 


// 创建hdfs文件系统临时文件,针对该文件创建 RecoverableFsDataOutputStream
HadoopRecoverableWriter#open
public RecoverableFsDataOutputStream open(Path filePath) throws IOException 
   final org.apache.hadoop.fs.Path targetFile = HadoopFileSystem.toHadoopPath(filePath);
   final org.apache.hadoop.fs.Path tempFile = generateStagingTempFilePath(fs, targetFile);
   return new HadoopRecoverableFsDataOutputStream(fs, targetFile, tempFile);


HadoopRecoverableFsDataOutputStream(FileSystem fs, Path targetFile, Path tempFile)
        throws IOException 
    // note: 确保hadoop 支持 truncate方法
    ensureTruncateInitialized();

    this.fs = checkNotNull(fs);
    this.targetFile = checkNotNull(targetFile);
    this.tempFile = checkNotNull(tempFile);
    // 创建临时文件
    this.out = fs.create(tempFile);

将数据以parquet格式写入临时文件,调用链路比较长中间涉及不少工厂类及代理类,最终调用的还是parquet框架本身的API。

filesystem.BulkPartWriter#write
public void write(IN element, long currentTime) throws IOException 
   writer.addElement(element);
   markWrite(currentTime);


parquet.ParquetBulkWriter#addElement
public void addElement(T datum) throws IOException 
   // note: org.apache.parquet.hadoop.ParquetWriter
   parquetWriter.write(datum); 

6.checkpoint 过程

在生产环境中大多使用 OnCheckpointRollingPolicy 策略,即在执行Checkpoint时存储BucketState,提交已写入的数据记录已写入数据的偏移量,在CK完成后将 inprogress 文件重命名为最终 part 文件。
根据Checkpoint生命周期方法,了解执行过程。

  1. initializeState 创建StreamingFileSinkHelper,做一些初始化工作。如果从已有的状态快照启动,会对BucketStates进行恢复,稍后详细介绍快照恢复的逻辑,先看状态快照中存储了什么信息,及后续逻辑。
  2. snapshotState 状态快照存储。Buckets 的 snapshotState() 会保存序列化后的 BucketState 及当前子任务处理的最大part文件个数。
public void snapshotState(
    final long checkpointId,
    final ListState<byte[]> bucketStatesContainer,
    final ListState<Long> partCounterStateContainer)
    throws Exception 
// note: 清理历史状态信息
bucketStatesContainer.clear();
partCounterStateContainer.clear();
    
// note: 将 BucketState 以二进制格式存储到 bucketStatesContainer
snapshotActiveBuckets(checkpointId, bucketStatesContainer);
// note: 存储当前任务处理的最大文件数    
partCounterStateContainer.add(maxPartCounter);


private void snapshotActiveBuckets(
        final long checkpointId, final ListState<byte[]> bucketStatesContainer)
        throws Exception 
    for (Bucket<IN, BucketID> bucket : activeBuckets.values()) 
        //note: 每个正在使用的Bucket会生成BucketState
        final BucketState<BucketID> bucketState = bucket.onReceptionOfCheckpoint(checkpointId);
        // note: 将BucketState 序列化后存储到 ListState
        final byte[] serializedBucketState =
                SimpleVersionedSerialization.writeVersionAndSerialize(
                        bucketStateSerializer, bucketState);

        bucketStatesContainer.add(serializedBucketState);
    


BucketState<BucketID> onReceptionOfCheckpoint(long checkpointId) throws IOException 
    //note: 关闭 inProgressPart, 填充BucketState 使用的属性信息
    prepareBucketForCheckpointing(checkpointId);
    
    //note: ck 期间有数据写入。
    if (inProgressPart != null) 
        inProgressFileRecoverable = inProgressPart.persist();
        inProgressFileCreationTime = inProgressPart.getCreationTime();
        this.inProgressFileRecoverablesPerCheckpoint.put(
                checkpointId, inProgressFileRecoverable);
    
    // note: 构建出BucketState
    return new BucketState<>(
            bucketId,
            bucketPath,
            inProgressFileCreationTime,
            inProgressFileRecoverable,
            pendingFileRecoverablesPerCheckpoint);


private void prepareBucketForCheckpointing(long checkpointId) throws IOException 
    if (inProgressPart != null && rollingPolicy.shouldRollOnCheckpoint(inProgressPart)) 
        closePartFile();
    
    //  note: closePartFile()会将生成的 pendingFileRecoverable 写入pendingFileRecoverablesForCurrentCheckpoint
    if (!pendingFileRecoverablesForCurrentCheckpoint.isEmpty()) 
        pendingFileRecoverablesPerCheckpoint.put(
                checkpointId, pendingFileRecoverablesForCurrentCheckpoint);
        pendingFileRecoverablesForCurrentCheckpoint = new ArrayList<>();
    

closePartFile() 主要处理工作包含:

  1. 使用 BulkWriter 将数据 flush 到外部文件。
  2. 基于当前 inprogressPart 创建出PendingFileRecoverable对象,其中封装了 HadoopFsRecoverable 对包含了targetFile(part文件)、tempFile(inprogress文件)、offset(数据当前写入的偏移量)属性,是最重要的状态信息。在checkpoint完成后,会将tempFile命名为targetFile。
  3. 关闭inprogressPart,填充 pendingFileRecoverablesForCurrentCheckpoint信息,代表当前CK正在处理的 inprogressPart文件。
private InProgressFileWriter.PendingFileRecoverable closePartFile() throws IOException 
    InProgressFileWriter.PendingFileRecoverable pendingFileRecoverable = null;
    if (inProgressPart != null) 
        //note: 和inProgressPart文件一一对应
        pendingFileRecoverable = inProgressPart.closeForCommit();
        //note: 存储到LIST
        pendingFileRecoverablesForCurrentCheckpoint.add(pendingFileRecoverable);
        inProgressPart = null;
    
    return pendingFileRecoverable;


FlinkFlink SQL 开源UI平台 flink-streaming-platform-web

FlinkFlink AscendingTimestampExtractor - Timestamp monotony violated

FlinkFlink TaskManager 一直 User file cache uses directory

FlinkFlink 写入 kafka 报错 The server disconnected before a response was received

FlinkFlink SQL Cannot instantiate user function cannot assign instance LinkedMap FlinkKafkaConsum

flinkFlink 1.12.2 源码浅析 : yarn-per-job模式解析 yarn 提交过程解析