从0到1Flink的成长之路(二十)-Flink 高级特性之自动重启策略和恢复

Posted 熊老二-

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了从0到1Flink的成长之路(二十)-Flink 高级特性之自动重启策略和恢复相关的知识,希望对你有一定的参考价值。

自动重启策略和恢复

1)、重启策略配置方式
配置文件
在flink-conf.yml中可以进行配置,示例如下:

restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 3
restart-strategy.fixed-delay.delay: 10 s

代码中
可以在代码中针对该任务进行配置,示例如下:

env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3, // 重启次数
Time.of(10, TimeUnit.SECONDS) // 延迟时间间隔 ))

2)、重启策略分类
其一、默认重启策略
如果配置Checkpoint,没有配置重启策略,那么代码中出现了非致命错误时,程序会无限重启。
其二、无重启策略

ob直接失败,不会尝试进行重启
设置方式一:
restart-strategy: none
设置方式二:
无重启策略也可以在程序中设置
val env = ExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.noRestart())

其三、固定延迟重启策略(开发中使用)

设置方式一: 重启策略可以配置flink-conf.yaml的下面配置参数来启用,作为默认的重启策略: 例子:
restart-strategy: fixed-delay restart-strategy.fixed-delay.attempts: 3
restart-strategy.fixed-delay.delay: 10 s
设置方式二: 也可以在程序中设置:
val env =ExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3, //最多重启3次数 Time.of(10, TimeUnit.SECONDS) // 重启时间间隔 ))
上面的设置表示:如果job失败,重启3次, 每次间隔10

其四、失败率重启策略(偶尔使用)

设置方式1: 失败率重启策略可以在flink-conf.yaml中设置下面的配置参数来启用: 例子:
restart-strategy:failure-rate
restart-strategy.failure-rate.max-failures-per-interval: 3
restart-strategy.failure-rate.failure-rate-interval: 5 min
restart-strategy.failure-rate.delay: 10 s 设置方式2: 失败率重启策略也可以在程序中设置: val
env = ExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.failureRateRestart(
3, // 每个测量时间间隔最大失败次数
Time.of(5, TimeUnit.MINUTES), //失败率测量的时间间隔
Time.of(10, TimeUnit.SECONDS) // 两次连续重启的时间间隔
))
上面的设置表示:如果5分钟内job失败不超过三次,自动重启, 每次间隔10s (如果5分钟内程序失
败超过3次,则程序退出)

3)、代码演示

package xx.xxxxxx.flink.checkpoint;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;
import java.util.concurrent.TimeUnit;
/**
* Flink Checkpoint定时保存状态State,设置应用失败以后,重启策略ReStart Strategy
*/
public class StreamRestartStrategyDemo {
public static void main(String[] args) throws Exception {
// 1. 执行环境-env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
// TODO: ================= 建议必须设置 ===================
// a. 设置Checkpoint-State的状态后端为FsStateBackend,本地测试时使用本地路径,集群测试时使用传入的HDFS的路径
if(args.length < 1){
env.setStateBackend(new FsStateBackend("file:///D:/datas/ckpt"));
//env.setStateBackend(new FsStateBackend("hdfs://node1.itcast.cn:8020/flink-checkpoints/checkpoint"));
}else {
// 后续集群测试时,传入参数:hdfs://node1.itcast.cn:8020/flink-checkpoints/checkpoint
env.setStateBackend(new FsStateBackend(args[0])) ; }
/*
b. 设置Checkpoint时间间隔为1000ms,意思是做 2 个 Checkpoint 的间隔为1000ms。
Checkpoint 做的越频繁,恢复数据时就越简单,同时 Checkpoint 相应的也会有一些IO消耗。
*/
env.enableCheckpointing(2000) ;// 默认情况下如果不设置时间checkpoint是没有开启的
/*
c. 设置两个Checkpoint 之间最少等待时间,如设置Checkpoint之间最少是要等 500ms
为了避免每隔1000ms做一次Checkpoint的时候,前一次太慢和后一次重叠到一起去了
如:高速公路上,每隔1s关口放行一辆车,但是规定了两车之前的最小车距为500m
*/
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// d. 设置如果在做Checkpoint过程中出现错误,是否让整体任务失败:true是 false不是
env.getCheckpointConfig().setFailOnCheckpointingErrors(false); // 默认为true
/*
e. 设置是否清理检查点,表示 Cancel 时是否需要保留当前的 Checkpoint,默认 Checkpoint会在作业被Cancel时被删除
ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:false,当作业被取消时,保留外部的checkpoint
ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION:true,当作业被取消时,删除外部的checkpoint(默认值) */
env.getCheckpointConfig().enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
);
// ================= 直接使用默认的即可 ===============
// a. 设置checkpoint的执行模式为EXACTLY_ONCE(默认),注意:得需要外部支持,如Source和Sink的支持
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// b. 设置checkpoint的超时时间,如果 Checkpoint在 60s内尚未完成说明该次Checkpoint失败,则丢弃。
env.getCheckpointConfig().setCheckpointTimeout(60000);
// c. 设置同一时间有多少个checkpoint可以同时执行
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // 默认为1
//======================配置重启策略==============
// a. 如果设置Checkpoint,而没有配置重启策略,代码中出现了非致命错误时,程序会无限重启
// b. 配置无重启策略
env.setRestartStrategy(RestartStrategies.noRestart()) ;
// c.固定延迟重启策略(开发中使用),如下:如果有异常,每隔5s重启1次,最多3次
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3, // 最多重启3次数
Time.of(5, TimeUnit.SECONDS) // 重启时间间隔
));
// d. 失败率重启策略(开发偶尔使用),如下:5分钟内,最多重启3次,每次间隔10
/*
env.setRestartStrategy(RestartStrategies.failureRateRestart(
3, // 每个测量时间间隔最大失败次数
Time.of(5, TimeUnit.MINUTES), //失败率测量的时间间隔
Time.of(10, TimeUnit.SECONDS) // 两次连续重启的时间间隔
))
*/
// 2. 数据源-source
DataStreamSource<String> inputDataStream = env.addSource(new SourceFunction<String>() {
private boolean isRunning = true;
private int counter = 0 ; @Override
public void run(SourceContext<String> ctx) throws Exception {
while (isRunning){
ctx.collect("flink spark");
TimeUnit.SECONDS.sleep(2);
counter += 1 ;
if(counter % 5 == 0){
throw new RuntimeException("程序程序异常啦.................") ; } }
}@Override
public void cancel() {
isRunning = false ; }
});
// 3. 数据转换-transformation
SingleOutputStreamOperator<Tuple2<String, Integer>> resultDataStream = inputDataStream
.filter(new FilterFunction<String>() { @Override
public boolean filter(String line) throws Exception {
return null != line && line.trim().length() > 0; }
})
.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { @Override
public void flatMap(String line,
Collector<Tuple2<String, Integer>> out) throws Exception {
for (String word : line.trim().split("\\\\W+")) {
out.collect(Tuple2.of(word, 1));
} }
})
.keyBy(0).sum(1);
// 4. 数据终端-sink
resultDataStream.printToErr();
// 5. 触发执行-execute
env.execute(StreamRestartStrategyDemo.class.getSimpleName());
} }

以上是关于从0到1Flink的成长之路(二十)-Flink 高级特性之自动重启策略和恢复的主要内容,如果未能解决你的问题,请参考以下文章

从0到1Flink的成长之路(二十)-Flink 高级特性之状态分类

从0到1Flink的成长之路(二十)-Flink 高级特性之Flink 状态管理

从0到1Flink的成长之路(二十)-Flink 高级特性之 Flink 容错机制

从0到1Flink的成长之路(二十)-Flink 高级特性之状态恢复和重启策略

从0到1Flink的成长之路(二十一)-Sink

从0到1Flink的成长之路(二十)-Time与Watermaker