从0到1Flink的成长之路(二十)-Flink 高级特性之Checkpoint 配置方式

Posted 熊老二-

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了从0到1Flink的成长之路(二十)-Flink 高级特性之Checkpoint 配置方式相关的知识,希望对你有一定的参考价值。

Checkpoint 配置方式

1)、全局配置
修改flink-conf.yaml

#jobmanager(即MemoryStateBackend), 
#filesystem(即FsStateBackend), 
#rocksdb(即RocksDBStateBackend)
state.backend: filesystem 
state.checkpoints.dir: hdfs://namenode:8020/flink-checkpoints/checkpoint

2)、在代码中配置

//1.MemoryStateBackend--开发中不用
env.setStateBackend(new MemoryStateBackend)
//2.FsStateBackend--开发中可以使用--适合一般状态--秒级/分钟级窗口...
env.setStateBackend(new FsStateBackend("hdfs路径或测试时的本地路径"))
//3.RocksDBStateBackend--开发中可以使用--适合超大状态--天级窗口...
env.setStateBackend(new RocksDBStateBackend(filebackend, true))

注意:RocksDBStateBackend还需要引入依赖

org.apache.flink
flink-statebackend-rocksdb_2.11
1.10.0

代码演示
针对Checkpoint进行相关设置,默认情况下Flink Job执行时,不进行Checkpoint操作。

package xx.xxxxx.flink.checkpoint;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;
import java.util.concurrent.TimeUnit;
/**
* Flink Checkpoint定时保存状态State,演示案例
*/
public class StreamCheckpointDemo {
public static void main(String[] args) throws Exception {
// 1. 执行环境-env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
// TODO: ================= 建议必须设置 ===================
// a. 设置Checkpoint-State的状态后端为FsStateBackend,本地测试时使用本地路径,集群测试时使用传入的HDFS的路径
if(args.length < 1){
env.setStateBackend(new FsStateBackend("file:///D:/datas/ckpt"));
//env.setStateBackend(new FsStateBackend("hdfs://node1.itcast.cn:8020/flink-checkpoints/checkpoint"));
}else {
// 后续集群测试时,传入参数:hdfs://node1.itcast.cn:8020/flink-checkpoints/checkpoint
env.setStateBackend(new FsStateBackend(args[0])) ; }
/*
b. 设置Checkpoint时间间隔为1000ms,意思是做 2 个 Checkpoint 的间隔为1000ms。
Checkpoint 做的越频繁,恢复数据时就越简单,同时 Checkpoint 相应的也会有一些IO消耗。
*/
env.enableCheckpointing(1000) ;// 默认情况下如果不设置时间checkpoint是没有开启的
/*
c. 设置两个Checkpoint 之间最少等待时间,如设置Checkpoint之间最少是要等 500ms
为了避免每隔1000ms做一次Checkpoint的时候,前一次太慢和后一次重叠到一起去了
如:高速公路上,每隔1s关口放行一辆车,但是规定了两车之前的最小车距为500m
*/
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// d. 设置如果在做Checkpoint过程中出现错误,是否让整体任务失败:true是 false不是
env.getCheckpointConfig().setFailOnCheckpointingErrors(false); // 默认为true
/*
e. 设置是否清理检查点,表示 Cancel 时是否需要保留当前的 Checkpoint,默认 Checkpoint会在作业被Cancel时被删除
ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:false,当作业被取消时,保留外部的checkpoint
ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION:true,当作业被取消时,删除外部的checkpoint(默认值) */
env.getCheckpointConfig().enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
);
// ================= 直接使用默认的即可 ===============
// a. 设置checkpoint的执行模式为EXACTLY_ONCE(默认),注意:需要外部支持,如Source和Sink的支持
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// b. 设置checkpoint的超时时间,如果 Checkpoint在 60s内尚未完成说明该次Checkpoint失败,则丢弃。
env.getCheckpointConfig().setCheckpointTimeout(60000);
// c. 设置同一时间有多少个checkpoint可以同时执行
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // 默认为1
// 2. 数据源-source
DataStreamSource<String> inputDataStream = env.addSource(new SourceFunction<String>() {
private boolean isRunning = true; @Override
public void run(SourceContext<String> ctx) throws Exception {
while (isRunning){
ctx.collect("flink spark");
TimeUnit.SECONDS.sleep(2);
} }@Override
public void cancel() {
isRunning = false ; }
});
// 3. 数据转换-transformation
SingleOutputStreamOperator<Tuple2<String, Integer>> resultDataStream = inputDataStream
.filter(line -> null != line && line.trim().length() > 0)
.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { @Override
public void flatMap(String line,
Collector<Tuple2<String, Integer>> out) throws Exception {
for (String word : line.trim().split("\\\\W+")) {
out.collect(Tuple2.of(word, 1));
} }
})
.keyBy(0).sum(1);
// 4. 数据终端-sink
resultDataStream.printToErr();
// 5. 应用执行
env.execute(StreamCheckpointDemo.class.getSimpleName());
} }

以上是关于从0到1Flink的成长之路(二十)-Flink 高级特性之Checkpoint 配置方式的主要内容,如果未能解决你的问题,请参考以下文章

从0到1Flink的成长之路(二十)-Flink 高级特性之状态分类

从0到1Flink的成长之路(二十)-Flink 高级特性之Flink 状态管理

从0到1Flink的成长之路(二十)-Flink 高级特性之 Flink 容错机制

从0到1Flink的成长之路(二十)-Flink 高级特性之状态恢复和重启策略

从0到1Flink的成长之路(二十一)-Sink

从0到1Flink的成长之路(二十)-Time与Watermaker