Checkpointing
Posted hackerxiaoyon
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Checkpointing相关的知识,希望对你有一定的参考价值。
上级:https://www.cnblogs.com/hackerxiaoyon/p/12747387.html
每个函数和算子都可以有状态在flink中。状态的函数存储数据通过独立的元素或事件处理。为了让状态具有容错性,flink需要把状态进行快照。检查点允许flink恢复状态和位置在流中,从而是的flink程序能够提供无故障执行机制。
Prerequisites
flink检查点机制与流和状态的持久化交互,总体来说,需要如下几点:
1、一个持久化数据源具有可以重播机制在某个时间点。如:kafka,rabbitMQ,Amazon Kinesis,Google PubSub ,pulsar也是 等或者文件系统 hdfs,s3,GFS,NFS,Ceph等
2、状态的持久存储通常是分布式系统,如:HDFS,S3,GFS,NFS,Ceph等、
Enabling and Configuring Checkpointing
默认检查点禁用。开启检查点,使用enableCheckpointng(n) 通过我们的我们在设置了环境变量后,
LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment()
env就可以点出来,n表示多久就触发checkpoint,时间单位毫秒。
其他checkpoint参数包括:
1、精准一次 vs 至少一次:检查点机制在两种保障水平间进行选择,精准一次可能大多程序选择。至少一次对于某些超低延迟的程序也许需要。
2、检查点超时:这个时间是检查点处理中被阻断,如果还没有完成时。
3、检查点间最小时间:为了确保流应用在检查点间在处理过程获取一定数量设置的多久通过检查点间。如果设置5000,那么下一个检查点不会立即超过5秒在上一个检查点完成后,忽略掉检查点持续时间和间隔时间。注意这个意味着检查点间隔将永远不会小于这个参数。
这是经常很容易配置应用通过定义“检查点间的时间”比定义检查点间隔时间。因为“检查点间的时间”不容易受到检查点实际花费时间,检查点可能有时候比平均更长时间。
这值意味着并发检查点数量为1.
4、并发检查点数量:默认,系统不会触发另外的检查点当一个在处理中。这个确保拓扑不会花太多时间在检查点也不会处理进行的流。可以允许多个重叠的检查点,这对于具有一定处理延迟但仍然希望进行非常频繁的检查点100毫秒在出现故障时很少重新处理的管道来说是有趣的。
这个选项不可以使用在设置了最小检查点间时间。
5、外部化的检查点:可以设置周期检查点持久化到外部设备。外部化检查点把元数据写入持久化存储并不会自动清楚当作业失败。这样如果你作业失败重启就需要检查点来恢复。详细信息https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/state/checkpoints.html#externalized-checkpoints
6、检查点错误后 失败/继续任务:就是任务执行检查点过程发生错误,任务是否会失败。如果是禁用,任务会拒绝检查点到检查点协调器并且继续运行。
7、选择检查点恢复:作业是否回调最近的检查点即使有很多最近的保存点存在。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// start a checkpoint every 1000 msenv.enableCheckpointing(1000);
// advanced options:
// set mode to exactly-once (this is the default)env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// make sure 500 ms of progress happen between checkpointsenv.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// checkpoints have to complete within one minute, or are discardedenv.getCheckpointConfig().setCheckpointTimeout(60000);
// allow only one checkpoint to be in progress at the same timeenv.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// enable externalized checkpoints which are retained after job cancellationenv.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// allow job recovery fallback to checkpoint when there is a more recent savepointenv.getCheckpointConfig().setPreferCheckpointForRecovery(true);
Related Config Options
conf/flink-conf.yaml 中更多配置https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html
Selecting a State Backend
flink的检查点机制存储所有的状态在计时器和有状态的算法一致性快照,包含连接器,窗口和任何用户自定义状态。检查点存储依赖状态后台配置。
默认状态在tm内存保持然后检查点存储在jm内存。对于大数据量的状态flink也同样支持。状态后端选择配置可以通过StreamExecutionEnvironment.setStateBackend(…).
更多设置https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/state/state_backends.html
State Checkpoints in Iterative Jobs
flink当前只提供作业处理中的保障。开启检查点机制在一个迭代作业中会报异常。为了在迭代作业中开启检查机制需要设置强制参数。env.enableCheckpointing(interval, CheckpointingMode.EXACTLY_ONCE, force = true)。
请注意数据在循环边过程中会丢失数据当作业失败。个人理解应该是在那种多个作业直接责任链的方式开启强制检查点会丢失数据,也不是很理解。
Restart Strategies
flink支持不同的重启策略来控制失败的作业。https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/restart_strategies.html
以上是关于Checkpointing的主要内容,如果未能解决你的问题,请参考以下文章