使用 flink runner 时如何在 apache Beam 中执行检查点?
Posted
技术标签:
【中文标题】使用 flink runner 时如何在 apache Beam 中执行检查点?【英文标题】:How to perform checkpointing in apache beam while using flink runner? 【发布时间】:2020-10-26 07:47:25 【问题描述】:我正在从一个未绑定的源(Kafka)阅读并将其字数写入其他 Kafka 主题。现在我想在光束管道中执行检查点。我已按照 apache beam 文档中的所有说明进行操作,但即使在此之后也没有创建检查点目录。
以下是我用于管道的参数:-
--runner=FlinkRunner
--streaming=true
--parallelism=2
--checkpointingInterval=1000
--checkpointTimeoutMillis=5000
--minPauseBetweenCheckpoints=500
--externalizedCheckpointsEnabled=true
--retainExternalizedCheckpointsOnCancellation=true
谁能帮我检查点?
【问题讨论】:
【参考方案1】:我已经研究过解决方案,一个是你可以更改链接集群的 flink-conf.yaml 中的 checkpoint.state.dir 路径,另一个是使用 flinkPipelineOptions-
@Description(
"Sets the state backend factory to use in streaming mode. "
+ "Defaults to the flink cluster's state.backend configuration.")
Class<? extends FlinkStateBackendFactory> getStateBackendFactory();
void setStateBackendFactory(Class<? extends FlinkStateBackendFactory> stateBackendFactory);
并通过设置 setStateBackendFactory(我已经使用自定义类完成了)
static class bakend implements FlinkStateBackendFactory
@Override
public StateBackend createStateBackend(FlinkPipelineOptions options)
return new FsStateBackend("file:///Users/myPc/word-count-beam/src/checkpoint/");
这将创建一个 checkpointDir,您还需要设置一个 checkpointinginterval 值才能启用检查点。
【讨论】:
【参考方案2】:我知道它很旧,但想同意你的回答。 我们在 2019 年构建了一个 dockerized flink,并使用这些选项进行梁和运行
--runner=FlinkRunner --streaming=true --checkpointingInterval=30000 --env=dev
我们在conf.yml中配置了rocksdb作为后端。
【讨论】:
以上是关于使用 flink runner 时如何在 apache Beam 中执行检查点?的主要内容,如果未能解决你的问题,请参考以下文章