使用 flink runner 时如何在 apache Beam 中执行检查点?

Posted

技术标签:

【中文标题】使用 flink runner 时如何在 apache Beam 中执行检查点?【英文标题】:How to perform checkpointing in apache beam while using flink runner? 【发布时间】:2020-10-26 07:47:25 【问题描述】:

我正在从一个未绑定的源(Kafka)阅读并将其字数写入其他 Kafka 主题。现在我想在光束管道中执行检查点。我已按照 apache beam 文档中的所有说明进行操作,但即使在此之后也没有创建检查点目录。

以下是我用于管道的参数:-

--runner=FlinkRunner
--streaming=true
--parallelism=2
--checkpointingInterval=1000
--checkpointTimeoutMillis=5000
--minPauseBetweenCheckpoints=500
--externalizedCheckpointsEnabled=true
--retainExternalizedCheckpointsOnCancellation=true

谁能帮我检查点?

【问题讨论】:

【参考方案1】:

我已经研究过解决方案,一个是你可以更改链接集群的 flink-conf.yaml 中的 checkpoint.state.dir 路径,另一个是使用 flinkPipelineOptions-

        @Description(
                "Sets the state backend factory to use in streaming mode. "
                        + "Defaults to the flink cluster's state.backend configuration.")
        Class<? extends FlinkStateBackendFactory> getStateBackendFactory();
        void setStateBackendFactory(Class<? extends FlinkStateBackendFactory> stateBackendFactory);

并通过设置 setStateBackendFactory(我已经使用自定义类完成了)

  static class  bakend implements FlinkStateBackendFactory

        @Override
        public StateBackend createStateBackend(FlinkPipelineOptions options) 
            return new FsStateBackend("file:///Users/myPc/word-count-beam/src/checkpoint/");

        
    

这将创建一个 checkpointDir,您还需要设置一个 checkpointinginterval 值才能启用检查点。

【讨论】:

【参考方案2】:

我知道它很旧,但想同意你的回答。 我们在 2019 年构建了一个 dockerized flink,并使用这些选项进行梁和运行

--runner=FlinkRunner --streaming=true --checkpointingInterval=30000 --env=dev

我们在conf.yml中配置了rocksdb作为后端。

【讨论】:

以上是关于使用 flink runner 时如何在 apache Beam 中执行检查点?的主要内容,如果未能解决你的问题,请参考以下文章

Flink State TTL 详解

flink_初识01

Apache Flink 欺诈交易检测

无法在Google Cloud Dataproc上启动Apache Flink 1.7

如何使用柯南遥控器配置 gitlab-runner?

如何使用 Runner.app 创建用于测试的 .ipa 文件?