Flink State 初探

Posted 心智成熟之路

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink State 初探相关的知识,希望对你有一定的参考价值。

最近换了工作,做 Flink 的企业定制版,主要工作内容包括 Flink 的 State 和 Checkpoint 等。接下来就会学习相关相关知识。


## 任务


看 StateBackend 相关代码,入口在 AbstrackStateBackend,包含多个具体的实现,State 保存在单机,暂时不涉及分布式相关的东西



## 希望最终能够回答如下问题:


- 有哪些 StateBackend 实现,区别都是什么,每一种的优劣是什么

- 作业选择不同 StateBackend 的标准是什么

- 不同 StateBackend 保存的数据结构是什么样的

- State 怎么保存的,保存在哪

- 不同 StateBackend 中内存占用怎么估计/计算

- 如果需要迁移 State 数据,怎么完成的(作业的并发进行调整)


## 工作:

 

- 看完 State 相关文档  https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/state.html

- 运行实例:(如无特殊说明,本地 State 相关均使用该代码)

```

 • public class StreamingState {

     public static void main(String[] args) throws Exception {

       // get the execution environment

       final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

 

 • // this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)

       env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(1L, 4L), Tuple2.of(1L, 2L))

           .keyBy(0)

           .flatMap(new CountWindowAverage())

           .print();

  

       env.execute("Streaming WordCount");

     }

• class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {

     /**

     * The ValueState handle. The first field is the count, the second field a running sum.

     */

     private transient ValueState<Tuple2<Long, Long>> sum;

  

     @Override

     public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {

       // access the state value

       Tuple2<Long, Long> currentSum = sum.value();

  

       // update the count

       currentSum.f0 += 1;

  

       // add the second field of the input value

      currentSum.f1 += input.f1;

  

       // update the state

       sum.update(currentSum);

  

       // if the count reaches 2, emit the average and clear the state

       if (currentSum.f0 >= 2) {

           out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));

           sum.clear();

       }

     }

  

     @Override

     public void open(Configuration config) {

       ValueStateDescriptor<Tuple2<Long, Long>> descriptor =

           new ValueStateDescriptor<>(

             "average", // the state name

             TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), // type information

             Tuple2.of(0L, 0L)); // default value of the state, if nothing was set

 

       sum = getRuntimeContext().getState(descriptor);

     }

 • }


```



## 遇到的问题:

1. 在 org.apache.flink.streaming.examples.wordcount package 下添加 MapReduce 文件后,运行遇到各种 scala 文件找不到,或者 scala 文件中的某些 symbol 找不到。


解决方法:各种尝试后(安装 scala 插件,引入相关的 project -- 没有找到的 scala 文件对应的 project)



## 实例的理解:


单步调试的跟踪执行过程,大致的流程如下(和 State 相关的细节需要再深入跟进)


1. 作业执行(前面的生成 Graph 之类的暂时忽略)

2. open 函数创建一个初始化的  State,后续多有的 Keyed-State 都会以这个 default State 为基础进行跟新

3. 每次过来的 Tuple 都会调用 flapMap 函数,然后跟进 key 出现的次数进行相应的更新

     3.1 如果当前的 key 出现第二次,则把这个 key 对应的两个 value 进行求平均值,发往下游,然后清空 State

    3.2 如果当前的 key 出现第一次,则仅仅更新 State 数据

4. MemoryStateBackend 大小 5M


接下来需要带着文章最开始的几个问题查看 State 相关的源码 


调试过程中,查看到 operator(StreamFlatMap 对象) 包含的信息如下

 • StateBackend 使用 MemoryStateBackend

 • KeyedStateBackend 使用 HeapKeyedStateBackend,HeapKeyedStateBackend 包含 kvStates、rawStates、kvStateRegistry、internalStateBinder、cancelStreamRegistry 等

 • keyedStateStore 为 DefaultKeyedStateStore

 • operatorStateBackend 为 DefaultOperatorStateBackend

 • timerServiceBackend 为 InternalTimerServiceBackend

• 有三个 watermark,分别 conbinedWatermark、input1Watermark、input2Watermark


接下来先重点把上面出现的相关 StateBackend 给了解清楚,然后把整个 StateBackend 相关的东西给了解清楚。


以上是关于Flink State 初探的主要内容,如果未能解决你的问题,请参考以下文章

apache Flink初探

Flink StateBackend 初探

Flink的sink实战之一:初探

2021年最新最全Flink系列教程_Flink原理初探和流批一体API(二.五)

2021年最新最全Flink系列教程_Flink原理初探和流批一体API

5.Flink原理初探角色分工执行流程图生成DataFlow,Operator,Partition,Parallelism,SubTaskOperatorChain和Task任务槽槽共享