为啥重启flink程序后有很多in-progress文件?

Posted

技术标签:

【中文标题】为啥重启flink程序后有很多in-progress文件?【英文标题】:why are there many in-progress files after restarting flink program?为什么重启flink程序后有很多in-progress文件? 【发布时间】:2019-08-05 09:21:51 【问题描述】:

我使用 flink 来消费 kafka 并将它们以 parquet 格式保存到 hdfs。现在我发现我的目标目录中有很多正在进行的文件,当我重新启动我的 flink 程序时,它们不会作为目标目录中的文件关闭。

我的环境:

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.enableCheckpointing(60000)
    env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
    env.getConfig.registerTypeWithKryoSerializer(classOf[MyMessage],classOf[ProtobufSerializer])


//sinks

    val bucketAssigner = new DateTimeBucketAssigner[myCounter]("yyyy-MM-dd", ZoneId.of("Asia/Shanghai"))
    val streamingFileSink = StreamingFileSink.
      forBulkFormat(path, ParquetAvroWriters.forSpecificRecord(classOf[myCounter]))
      .withBucketCheckInterval(60000)
      .withBucketAssigner(bucketAssigner).build

-rw-r--r--   3 Administrator hdfs       1629 2019-08-05 17:06 /user/data/2019-08-05/.part-2-0.inprogress.722265d7-1082-4c84-b70d-da2a08092f5d
-rw-r--r--   3 Administrator hdfs       1891 2019-08-05 17:07 /user/data/2019-08-05/.part-2-1.inprogress.ac0d8b56-b8f0-4893-9e55-5374b69f16cc
-rw-r--r--   3 Administrator hdfs       1891 2019-08-05 17:08 /user/data/2019-08-05/.part-2-2.inprogress.a427c2e2-d689-42b8-aa3d-77873c5654f2
-rw-r--r--   3 Administrator hdfs       1891 2019-08-05 17:09 /user/data/2019-08-05/.part-2-3.inprogress.b5c746e3-354d-4ab3-b1a4-8c6bd88ae430
-rw-r--r--   3 Administrator hdfs       1891 2019-08-05 16:59 /user/data/2019-08-05/.part-2-3.inprogress.e286d995-3fa7-4696-b51a-27378412a35c
-rw-r--r--   3 Administrator hdfs       1891 2019-08-05 17:00 /user/data/2019-08-05/.part-2-4.inprogress.bcde4f30-2f78-4f54-92ad-9bc54ac57c5c
-rw-r--r--   3 Administrator hdfs       1891 2019-08-05 17:10 /user/data/2019-08-05/.part-2-4.inprogress.dbce8a00-6514-43dc-8b31-36c5a8665d37
-rw-r--r--   3 Administrator hdfs          0 2019-08-05 17:10 /user/data/2019-08-05/.part-2-5.inprogress.34e53418-f5af-4279-87ef-6a27549d90fe
-rw-r--r--   3 Administrator hdfs       1891 2019-08-05 17:01 /user/data/2019-08-05/.part-2-5.inprogress.936cdb63-4fe2-41bf-b839-2861030c5516
-rw-r--r--   3 Administrator hdfs          0 2019-08-05 16:55 /user/data/2019-08-05/.part-2-6.inprogress.7a7099a6-9dcd-450b-af2c-8a676276ef0a
-rw-r--r--   3 Administrator hdfs          0 2019-08-05 17:01 /user/data/2019-08-05/.part-2-6.inprogress.b57f548f-45fc-497c-9807-ef18dba3d11d
-rw-r--r--   3 Administrator hdfs       1574 2019-08-05 16:56 /user/data/2019-08-05/part-2-0
-rw-r--r--   3 Administrator hdfs       1868 2019-08-05 16:57 /user/data/2019-08-05/part-2-1
-rw-r--r--   3 Administrator hdfs       1891 2019-08-05 16:58 /user/data/2019-08-05/part-2-2
-rw-r--r--   3 Administrator hdfs       1661 2019-08-05 16:53 /user/data/2019-08-05/part-2-3
-rw-r--r--   3 Administrator hdfs       1891 2019-08-05 16:54 /user/data/2019-08-05/part-2-4

我认为原因是我重新启动程序时正在进行的文件没有关闭,我很困惑为什么重新启动后文件不会关闭,甚至新文件也变成了正在进行中。有人能解释一下吗?

【问题讨论】:

【参考方案1】:

简称为 Exactly-Once 语义。

请先阅读 Flink 官方博客中的this post。

那么让我试着解释清楚。

    BucketingSink 将所有记录写入临时文件,默认情况下带有后缀 in-progress。

    当在这个 sink 上检查点的时间到来时,Flink 会将正在进行的文件的名称保存到检查点;

    到了提交时间时,Flink 会将进行中的文件重命名为最终名称,在您的示例中,它们是 part-x-x 文件。

并且当你重启 Flink 应用程序时,Flink 作业将从上一个保存点重新启动(如果你设置了参数),并且许多未准备好提交的正在进行的文件将被丢弃,并且永远不会被读取(以点开头HDFS不会被用户列出)。

当然,我忽略了很多细节,例如,当文件的体积超过配置时,文件将被重命名为 .pending 文件等。

【讨论】:

就我而言,我没有以这种方式设置检查点路径 env.setStateBackend(new FsStateBackend("hdfs:///user//data/flink/checkpoints"))。取消导致正在进行中的flink不会关闭检查点吗? 此外,我注意到重新启动后计数器从 0 开始。是不是也是缺少checkpoint造成的?如果我设置检查点,它会从进行中恢复吗? 根据 BucketingSink 的源码,Flink 永远不会删除 .in-progress 文件,即使你从 checkpoint 开始。 发生错误时,Flink HA 会根据你的重启策略自动重启作业。如果您只是从 UI 中取消作业,则没有检查点。使用 -s 参数取消 Flink 作业,你会得到保存点,然后从保存点开始。 如果获得保存点,是否会继续写入正在进行的文件并将其转为完成状态?【参考方案2】:

您需要使用 flink shell 提交您的应用程序以使应用程序从保存点恢复,如下所示:./bin/flink run -s <savepointPath> ...,查看this 了解更多详细信息。StreamingFileSink 将处理正在进行的文件。

【讨论】:

我就是这么做的。我在 windows + IDEA 上运行 flink 来使用 linux 服务器上的数据进行测试。我通过在 IDEA 上重新运行来重新启动我的应用程序 所以你 flink stop 你的应用程序并再次从 IDEA 运行它,对吗?我认为如果正确停止它应该重命名 inprogress 文件。 没有。我没有flink stop,我只是在IDEA上重新运行我的flink程序。 看来你需要用flink shell提交你的应用程序,因为你不能从代码中恢复应用程序(所以你不能恢复一个用IDEA启动的应用程序)。 知道了。我会试一试。谢谢!

以上是关于为啥重启flink程序后有很多in-progress文件?的主要内容,如果未能解决你的问题,请参考以下文章

TensorFlow,为啥保存模型后有3个文件?

为啥尽管观察者的状态发生变化,但应用程序重启后 Flutter BloC UI 没有更新?

SQL启动srever时报,启动后有自动停止,一些组件无法启动,为啥?

RabbitMQ系列二 为啥选择RabbitMQ

为啥我的 XAMPP Apache 服务不断重启?

Flink重启策略 flink出现异常重新拉起任务