day05_Flink容错机制

Posted ChinaManor

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了day05_Flink容错机制相关的知识,希望对你有一定的参考价值。

day05_Flink容错机制

今日目标

  • Flink容错机制之Checkpoint
  • Flink容错机制之重启策略
  • 存储介质StateBackend
  • Checkpoint 配置方式
  • 状态恢复和重启策略
  • Savepoint手动重启并恢复
  • 并行度设置

Flink状态管理

  • 状态就是基于 key 或者 算子 operator 的中间结果

  • Flink state 分为两种 : Managed state - 托管状态 , Raw state - 原始状态

  • Managed state 分为 两种:

    1. keyed state 基于 key 上的状态

      支持的数据结构 valueState listState mapState broadcastState

    2. operator state 基于操作的状态

      字节数组, ListState

Flink keyed state 案例

  • 需求

    使用KeyedState中的ValueState获取数据中的最大值(实际中直接使用maxBy即可),使用值状态自定义,

    <hello,1>

    <hello,3>

    <hello,2>

    输入Tuple2<String/单词/, Long/长度/> 输出 Tuple3<String/单词/, Long/长度/, Long/历史最大值/> 类型

  • 开发

    package cn.itcast.flink.state;
    
    import org.apache.flink.api.common.functions.RichMapFunction;
    import org.apache.flink.api.common.state.ValueState;
    import org.apache.flink.api.common.state.ValueStateDescriptor;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.api.java.tuple.Tuple3;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    /**
     * Author itcast
     * Date 2021/6/21 8:34
     * Desc TODO
     */
    public class KeyedStateDemo {
        public static void main(String[] args) throws Exception {
            //1.env 设置并发度为1
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            //2.Source 参看课件 <城市,次数> => <城市,最大次数>
            DataStreamSource<Tuple2<String, Long>> tupleDS = env.fromElements(
                    Tuple2.of("北京", 1L),
                    Tuple2.of("上海", 2L),
                    Tuple2.of("北京", 6L),
                    Tuple2.of("上海", 8L),
                    Tuple2.of("北京", 3L),
                    Tuple2.of("上海", 4L)
            );
            //3.Transformation
            //使用KeyState中的ValueState获取流数据中的最大值(实际中直接使用maxBy即可)
            //实现方式1:直接使用maxBy--开发中使用该方式即可
            SingleOutputStreamOperator<Tuple2<String, Long>> result1 = tupleDS.keyBy(t -> t.f0)
                    //min只会求出最小的那个字段,其他的字段不管
                    //minBy会求出最小的那个字段和对应的其他的字段
                    //max只会求出最大的那个字段,其他的字段不管
                    //maxBy会求出最大的那个字段和对应的其他的字段
                    .maxBy(1);
            //实现方式2:通过managed state输入的state
            //3.1.先根据字符串f0分组然后进行 map 操作,将Tuple2<String/*城市*/, Long/*次数*/> 输出 Tuple3<String/*城市*/, Long/*次数*/, Long/*历史最大值*/>
            //
            SingleOutputStreamOperator<Tuple3<String, Long, Long>> result2 = tupleDS
                    .keyBy(t->t.f0)
                    .map(new RichMapFunction<Tuple2<String, Long>, Tuple3<String/*城市*/, Long/*次数*/, Long/*历史最大值*/>>() {
                ValueState<Long> maxState = null;
    
                //-1.定义值类型的状态用来存储最大值
                //3.2.重写 RichMapFunction 的open 方法
                @Override
                public void open(Configuration parameters) throws Exception {
                    //-2.定义状态描述符
                    //-3.从当前上下文获取内存中的状态值
                    ValueStateDescriptor maxStateDesc = new ValueStateDescriptor("maxState", Long.class);
                    maxState = getRuntimeContext().getState(maxStateDesc);
                }
    
                //3.3.重写 map 方法
                //-4.获取state中历史最大值value和当前元素的最大值并比较
                @Override
                public Tuple3<String, Long, Long> map(Tuple2<String, Long> value) throws Exception {
                    //内存中state的存储的最大值
                    Long maxValue = maxState.value();
                    //当前的值
                    Long curValue = value.f1;
                    if (maxValue == null || curValue > maxValue) {
                        maxState.update(curValue);
                        return Tuple3.of(value.f0, value.f1, curValue);
                    } else {
                        return Tuple3.of(value.f0, value.f1, maxValue);
                    }
                }
            });
    
    
            //-5.如果当前值大或历史值为空更新状态;返回Tuple3元祖结果
            //4.Sink 打印输出
            //result1.print();
            result2.print();
            //5.execute 执行环境
            env.execute();
        }
    }
    

Flink operator state 案例

  • 需求

    使用ListState存储offset模拟消费Kafka的offset维护

  • 实现

    package cn.itcast.flink.state;
    
    import org.apache.flink.api.common.restartstrategy.RestartStrategies;
    import org.apache.flink.api.common.state.ListState;
    import org.apache.flink.api.common.state.ListStateDescriptor;
    import org.apache.flink.runtime.state.FunctionInitializationContext;
    import org.apache.flink.runtime.state.FunctionSnapshotContext;
    import org.apache.flink.runtime.state.filesystem.FsStateBackend;
    import org.apache.flink.streaming.api.CheckpointingMode;
    import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
    
    import java.util.Iterator;
    
    /**
     * Author itcast
     * Date 2021/6/21 9:18
     * Desc TODO
     */
    public class OperatorStateDemo {
        public static void main(String[] args) throws Exception {
            //1.创建流环境,便于观察设置并行度为 1
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            //2.开启checkpoint ,并将状态保存到 file:///D:/chk ,先开启checkpoint ,state管理
            env.enableCheckpointing(1000);
            env.setStateBackend(new FsStateBackend("file:///D:/chk"));
            //3.设置checkpoint的配置 外部chk,仅一次语义等
            env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
            //4.开启重启策略 3秒钟尝试重启3次
            env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,3000));
            //5.添加数据源比如 MyMonitorKafkaSource , 实例化创建 MyMonitorKafkaSource
            DataStreamSource<String> source = env.addSource(new MyMonitorKafkaSource());
            //6.打印输出
            source.print();
            //7.执行
            env.execute();
    
    
        }
        //创建 MyMonitorKafkaSource 继承 RichParallelSourceFunction<String> 并实现 CheckpointedFunction
        public static class MyMonitorKafkaSource extends RichParallelSourceFunction<String>
        implements CheckpointedFunction{
            //重写initializeState方法 ListStateDescriptor 状态描述和通过context获取 offsetState
            ListState<Long> offsetState = null;
            boolean flag = true;
            Long offset = 0L;
            @Override
            public void initializeState(FunctionInitializationContext context) throws Exception {
                ListStateDescriptor<Long> offsetStateDesc = new ListStateDescriptor<>("offsetState", Long.class);
                offsetState = context.getOperatorStateStore().getListState(offsetStateDesc);
            }
            //重写run方法 读取出 offset 并 循环读取offset+=1,拿到执行的核心编号,输出(核编号和offset),一秒一条,每5条模拟一个异常
            @Override
            public void run(SourceContext<String> ctx) throws Exception {
                Iterator<Long> iterator = offsetState.get().iterator();
                if(iterator.hasNext()){
                    offset = iterator.next();
                }
                while(flag){
                    offset = offset + 1;
                    //处理 CPU 核心Index
                    int idx = getRuntimeContext().getIndexOfThisSubtask();
                    System.out.println("index:"+idx+" offset:"+offset);
                    Thread.sleep(1000);
                    if(offset % 5 ==0){
                        System.out.println("当前程序出错了....");
                        throw new Exception("程序出BUG...");
                    }
                }
            }
            //重写cancel方法
            @Override
            public void cancel() {
                flag = false;
            }
    
            //重写snapshotState方法 , 清空 offsetState ,并将最新的offset添加进去
            @Override
            public void snapshotState(FunctionSnapshotContext context) throws Exception {
                offsetState.clear();
                offsetState.add(offset);
            }
        }
    }
    

Flink的容错机制

  • checkpoint : 某一时刻,Flink中所有的Operator的当前State的全局快照,一般存在磁盘上。

checkpoint 的执行流程

  • 触发checkpoint , JobManager 主节点
  • JobManager 触发 barrier 信号, 给 source -> transformation -> sink , 都会触发,将当前算子 operator state 保存到 HDFS 或者本地文件上, 每个operator 都备份完, 当前一个 checkpoint 就执行完毕了。

存储介质

  • memoryStatebackend 生产环境不推荐
  • FsStatebackend 就是存储到 HDFS 或者本地文件系统上 ,都可以用于生产环境。
  • RocksdbStatebackend 先在本地进行存储, 异步增量的存储到 HDFS 文件系统上, 一般支持大的中间state 场景

Checkpoint 配置方式

  1. 全局的配置文件 flink-conf.yaml

    state.backend: filesystem
    
    # Directory for checkpoints filesystem, when using any of the default bundled
    # state backends.
    #
    state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints
    
    # Default target directory for savepoints, optional.
    #
    state.savepoints.dir: hdfs://namenode-host:port/flink-savepoints
    
  2. 通过代码进行设置

    env.enableCheckpointing(1000);
    env.setStateBackend(new FsStateBackend("file:///D:/chk"));
    
  3. 需求

    package cn.itcast.flink.state;
    
    import org.apache.flink.api.common.restartstrategy.RestartStrategies;
    import org.apache.flink.api.common.state.ListState;
    import org.apache.flink.api.common.state.ListStateDescriptor;
    import org.apache.flink.runtime.state.FunctionInitializationContext;
    import org.apache.flink.runtime.state.FunctionSnapshotContext;
    import org.apache.flink.runtime.state.filesystem.FsStateBackend;
    import org.apache.flink.streaming.api.CheckpointingMode;
    import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.CheckpointConfig;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
    
    import java.util.Iterator;
    
    /**
     * Author itcast
     * Date 2021/6/21 9:18
     * Desc TODO
     */
    public class OperatorStateDemo {
        public static void main(String[] args) throws Exception {
            //1.创建流环境,便于观察设置并行度为 1
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            //2.开启checkpoint ,并将状态保存到 file:///D:/chk ,先开启checkpoint ,state管理
            env.enableCheckpointing(1000);
            // 设置 state backend FsStateBackend : 文件系统
            // RocksdbStateBackend : rocksdb 插件 异步增量刷新到 HDFS 文件系统中
            env.setStateBackend(new FsStateBackend("file:///D:/chk"));
            // 设置 checkpoint,如果当前的任务被取消,确定当前的checkpoint 是否删除
            env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig
                    .ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
            // 设置当前的 checkpoint 的超时时间
            env.getCheckpointConfig().setCheckpointTimeout(60000);
            // 两个checkpoint 之间最短的间隔时间
            env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
            // 设置当前并行执行的 checkpoint 的个数
            env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);
            //env.setStateBackend(new FsStateBackend("hdfs://node1:8020/checkpoints"));
            //env.setStateBackend(new RocksdbStateBackend())
            //3.设置checkpoint的配置 外部chk,仅一次语义等
            env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
            //4.开启重启策略 3秒钟尝试重启3次
            env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,3000));
            //5.添加数据源比如 MyMonitorKafkaSource , 实例化创建 MyMonitorKafkaSource
            DataStreamSource<String> source = env.addSource(new MyMonitorKafkaSource());
            //6.打印输出
            source.print();
            //7.执行
            env.execute();
    
    
        }
        //创建 MyMonitorKafkaSource 继承 RichParallelSourceFunction<String> 并实现 CheckpointedFunction
        public static class MyMonitorKafkaSource extends RichParallelSourceFunction<String>
        implements CheckpointedFunction{
            //重写initializeState方法 ListStateDescriptor 状态描述和通过context获取 offsetState
            ListState<Long> offsetState = null;
            boolean flag = true;
            Long offset = 0L;
            @Override
            public void initializeState(FunctionInitializationContext context) throws Exception {
                ListStateDescriptor<Long> offsetStateDesc = new ListStateDescriptor<>("offsetState", Long.class);
                offsetState = context.getOperatorStateStore().getListState(offsetStateDesc);
            }
            //重写run方法 读取出 offset 并 循环读取offset+=1,拿到执行的核心编号,输出(核编号和offset),一秒一条,每5条模拟一个异常
            @Override
            public void run(SourceContext<String> ctx) throws Exception {
                Iterator<Long> iterator = offsetState.get().iterator();
                if(iterator.hasNext()){
                    offset = iterator.next();
                }
                while(flag){
                    offset = offset + 1;
                    //处理 CPU 核心Index
                    int idx = getRuntimeContext().getIndexOfThisSubtask();
                    System.out.println("index:"+idx+" offset:"+offset);
                    Thread.sleep(1000);
                    if(offset % 5 ==0){
                        System.out.println("当前程序出错了....");
                        throw new Exception("程序出BUG...");
                    }
                }
            }
            //重写cancel方法
            @Override
            public void cancel() {
                flag = false;
            }
    
            //重写snapshotState方法 , 清空 offsetState ,并将最新的offset添加进去
            @Override
            public void snapshotState(FunctionSnapshotContext context) throws Exception {
                offsetState.clear();
                offsetState.add(offset);
            }
        }
    }
    
    

状态恢复和重启策略