Checkpointing

Posted hackerxiaoyon

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Checkpointing相关的知识,希望对你有一定的参考价值。

上级:https://www.cnblogs.com/hackerxiaoyon/p/12747387.html

每个函数和算子都可以有状态在flink状态的函数存储数据通过独立的元素或事件处理为了让状态具有容错性flink需要把状态进行快照检查点允许flink恢复状态和位置在流中从而是的flink程序能够提供无故障执行机制

 

Prerequisites

flink检查点机制与流和状态的持久化交互总体来说需要如下几点

1、一个持久化数据源具有可以重播机制在某个时间点kafkarabbitMQAmazon KinesisGoogle PubSub pulsar也是 等或者文件系统 hdfss3GFSNFSCeph

2、状态的持久存储通常是分布式系统HDFSS3GFSNFSCeph

 

Enabling and Configuring Checkpointing

默认检查点禁用开启检查点使用enableCheckpointng(n) 通过我们的我们在设置了环境变量后

LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment()

env就可以点出来,n表示多久就触发checkpoint时间单位毫秒

 

其他checkpoint参数包括

1、精准一次 vs 至少一次检查点机制在两种保障水平间进行选择精准一次可能大多程序选择至少一次对于某些超低延迟的程序也许需要

2、检查点超时这个时间是检查点处理中被阻断如果还没有完成时

3、检查点间最小时间为了确保流应用在检查点间在处理过程获取一定数量设置的多久通过检查点间如果设置5000那么下一个检查点不会立即超过5秒在上一个检查点完成后忽略掉检查点持续时间和间隔时间注意这个意味着检查点间隔将永远不会小于这个参数

 

这是经常很容易配置应用通过定义“检查点间的时间”比定义检查点间隔时间因为“检查点间的时间”不容易受到检查点实际花费时间检查点可能有时候比平均更长时间

这值意味着并发检查点数量为1.

 

4、并发检查点数量默认系统不会触发另外的检查点当一个在处理中这个确保拓扑不会花太多时间在检查点也不会处理进行的流可以允许多个重叠的检查点这对于具有一定处理延迟但仍然希望进行非常频繁的检查点100毫秒在出现故障时很少重新处理的管道来说是有趣的

这个选项不可以使用在设置了最小检查点间时间

5、外部化的检查点可以设置周期检查点持久化到外部设备外部化检查点把元数据写入持久化存储并不会自动清楚当作业失败这样如果你作业失败重启就需要检查点来恢复详细信息https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/state/checkpoints.html#externalized-checkpoints

6、检查点错误后 失败/继续任务就是任务执行检查点过程发生错误任务是否会失败如果是禁用任务会拒绝检查点到检查点协调器并且继续运行

7、选择检查点恢复作业是否回调最近的检查点即使有很多最近的保存点存在

 

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// start a checkpoint every 1000 msenv.enableCheckpointing(1000);

// advanced options:

// set mode to exactly-once (this is the default)env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

// make sure 500 ms of progress happen between checkpointsenv.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);

// checkpoints have to complete within one minute, or are discardedenv.getCheckpointConfig().setCheckpointTimeout(60000);

// allow only one checkpoint to be in progress at the same timeenv.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

// enable externalized checkpoints which are retained after job cancellationenv.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

// allow job recovery fallback to checkpoint when there is a more recent savepointenv.getCheckpointConfig().setPreferCheckpointForRecovery(true);

 

Related Config Options

conf/flink-conf.yaml 中更多配置https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html

 

Selecting a State Backend

flink的检查点机制存储所有的状态在计时器和有状态的算法一致性快照包含连接器窗口和任何用户自定义状态检查点存储依赖状态后台配置

默认状态在tm内存保持然后检查点存储在jm内存对于大数据量的状态flink也同样支持状态后端选择配置可以通过StreamExecutionEnvironment.setStateBackend(…).

更多设置https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/state/state_backends.html

 

State Checkpoints in Iterative Jobs

flink当前只提供作业处理中的保障开启检查点机制在一个迭代作业中会报异常为了在迭代作业中开启检查机制需要设置强制参数env.enableCheckpointing(interval, CheckpointingMode.EXACTLY_ONCE, force = true)

请注意数据在循环边过程中会丢失数据当作业失败个人理解应该是在那种多个作业直接责任链的方式开启强制检查点会丢失数据也不是很理解

Restart Strategies

flink支持不同的重启策略来控制失败的作业https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/restart_strategies.html

 

以上是关于Checkpointing的主要内容,如果未能解决你的问题,请参考以下文章