Flink核心篇,四大基石容错机制广播反压序列化内存管理资源管理...

Posted zhisheng_blog

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink核心篇,四大基石容错机制广播反压序列化内存管理资源管理...相关的知识,希望对你有一定的参考价值。

Flink基础篇,基本概念、设计理念、架构模型、编程模型、常用算子

大纲:

1、Flink的四大基石包含哪些?

2、讲一下Flink的Time概念?

3、介绍下Flink窗口,以及划分机制?

4、介绍下Flink的窗口机制,以及各组件之间是如何相互工作的?

5、在流数据处理中,有没有遇到过数据延迟等问题,通过什么处理呢?

6、WaterMark是什么?原理讲解一下?

7、窗口如何划分的?什么时候触发计算?

8、如果数据延迟非常严重呢?只使用WaterMark可以处理吗?那应该怎么解决?

9、Flink 中的状态State是什么?简单介绍下?

10、Flink 状态包括哪些?

11、Flink 广播状态了解吗?

12、Flink 状态接口包括哪些?

13、Flink 状态如何存储?

14、Flink 状态如何持久化?

15、Flink 状态过期后如何清理?

16、Flink 通过什么实现可靠的容错机制?

17、什么是Checkpoin检查点?

18、什么是Savepoin保存点?

19、什么是CheckpointCoordinator检查点协调器?

20、Checkpoint中保存的是什么信息?

21、当作业失败后,检查点如何恢复作业?

22、当作业失败后,从保存点如何恢复作业?

23、Flink如何实现轻量级异步分布式快照?

24、什么是Barrier对齐?

25、什么是Barrier不对齐?

26、为什么要进行barrier对齐?不对齐到底行不行?

27、Flink支持Exactly-Once语义,那什么是Exactly-Once?

28、要实现 Exactly-Once,需具备什么条件?

29、什么是两阶段提交协议?

30、Flink 如何保证 Exactly-Once 语义?

31、对Flink端到端 严格一次Exactly-Once 语义做个总结

32、Flink广播机制了解吗?

33、Flink反压了解吗?

34、Flink反压的影响有哪些?

35、Flink反压如何解决?

36、Flink支持的数据类型有哪些?

37、Flink如何进行序列和反序列化的?

38、为什么Flink使用自主内存而不用JVM内存管理?

39、那Flink自主内存是如何管理对象的?

40、Flink内存模型介绍一下?

41、Flink如何进行资源管理的?(slot、task、subtask、parallelism、Operator Chains)

1、Flink的四大基石包含哪些?

Flink四大基石分别是:Time(时间)、Window(窗口)、State(状态)、Checkpoint(检查点)。

2、讲一下Flink的Time概念?

在Flink的流式处理中,会涉及到时间的不同概念,主要分为三种时间机制,如下图所示:

EventTime[事件时间]

事件发生的时间,例如:点击网站上的某个链接的时间,每一条日志都会记录自己的生成时间。

如果以EventTime为基准来定义时间窗口那将形成EventTimeWindow,要求消息本身就应该携带EventTime

IngestionTime[摄入时间]

数据进入Flink的时间,如某个Flink节点的sourceoperator接收到数据的时间,例如:某个source消费到kafka中的数据

如果以IngesingtTime为基准来定义时间窗口那将形成IngestingTimeWindow,以source的systemTime为准

ProcessingTime[处理时间]

某个Flink节点执行某个operation的时间,例如:timeWindow处理数据时的系统时间,默认的时间属性就是Processing Time

如果以ProcessingTime基准来定义时间窗口那将形成ProcessingTimeWindow,以operator的systemTime为准

数据漂移,就是昨天23:59的数据,进入到今天的分区,解决:前后冗余15min的数据,再通过多字段限制过滤掉

在Flink的流式处理中,绝大部分的业务都会使用EventTime,一般只在EventTime无法使用时,才会被迫使用ProcessingTime或者IngestionTime。

如果要使用EventTime,那么需要引入EventTime的时间属性,引入方式如下所示:

final StreamExecutionEnvironment env  

    = StreamExecutionEnvironment.getExecutionEnvironrnent();

// 使用处理时间

env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) ; 

// 使用摄入时间

env.setStrearnTimeCharacteristic(TimeCharacteristic.IngestionTime);

// 使用事件时间

env.setStrearnTimeCharacteristic(TimeCharacteristic.EventTime);

3、介绍下Flink窗口,以及划分机制?

流式计算是一种被设计用于处理无限数据集的数据处理引擎,而无限数据集是指一种不断增长的本质上无限的数据集,而Window窗口是一种切割无限数据为有限块进行处理的手段。

在Flink中, 窗口(window)是处理无界流的核心,窗口把流切割成有限大小的多个"存储桶"(bucket),我们在这些桶上进行计算.

Flink支持两种划分窗口的方式(time和count),按时间驱动进行划分、按数据驱动进行划分。

注意:

带xxxWindowAll()的窗口方法,都是无并行,即所有数据都会在一个task上,未分组的数据(没有keyBy),只能调用带All()的窗口方法(xxxWindowAll())。

Window可以分成两类:TimeWindow() 和 CountWindow()

不带All()的窗口方法(xxxWindow()),只有分组后才可以调用,每个分组都有自己的窗口

Flink支持窗口的两个重要属性(窗口长度size和滑动间隔interval),通过窗口长度和滑动间隔来区分滚动窗口和滑动窗口。

如果size = interval,那么就会形成tumbling-window(无重叠数据)–滚动窗口

如果size(1min) > interval(30s),那么就会形成sliding-window(有重叠数据)–滑动窗口

通过组合可以得出四种基本窗口:

(1)time-tumbling-window 无重叠数据的时间窗口,基于时间的滚动窗口,设置方式举例:xxx.timeWindow(Time.seconds(5))只传一个参数,就是滚动窗口,size为窗口的大小

(2)time-sliding-window 有重叠数据的时间窗口,基于时间的滑动窗口,设置方式举例:xxx.timeWindow(Time.seconds(10), Time.seconds(5)),传两个参数,就是滑动窗口,size为窗口的大小,slide为窗口滑动的大小(即步长)

(3)count-tumbling-window无重叠数据的数量窗口,设置方式举例:countWindow(5)—基于数量的滚动窗口

按照指定的数据条数生成一个Window,与时间无关

分组之后再开窗,那么窗口的关闭是看,相同分组的数据条数是否达到,

例如,窗口大小为3,当同一组的数据达到3条才会关窗,a,a,b ,b是不会关窗的,只有其中一组的数据达到3条,才会关闭该窗口

(4)count-sliding-window 有重叠数据的数量窗口,设置方式举例:countWindow(10,5)—基于数量的滑动窗口

滑动窗口,每经过一个步长slide,都会有一个窗口关闭,就会输出一次,在第一条数据之前,也是有窗口的,只不过是没有数据属于那个窗口

Flink中还支持一个特殊的窗口:会话窗口SessionWindows

session窗口分配器通过session活动来对元素进行分组,session窗口跟滚动窗口和滑动窗口相比,不会有重叠和固定的开始时间和结束时间的情况.

session窗口在一个固定的时间周期内不再收到元素,即非活动间隔产生,那个这个窗口就会关闭。

一个session窗口通过一个session间隔来配置,这个session间隔定义了非活跃周期的长度,当这个非活跃周期产生,那么当前session将关闭并且后续的元素将被分配到新的session窗口中去,如下图所示:

4、介绍下Flink的窗口机制,以及各组件之间是如何相互工作的?

以下为窗口机制的流程图:

WindowAssigner

1、窗口算子负责处理窗口,数据流源源不断地进入算子(window operator)时,每一个到达的元素首先会被交给 WindowAssigner。WindowAssigner 会决定元素被放到哪个或哪些窗口(window),可能会创建新窗口。因为一个元素可以被放入多个窗口中(个人理解是滑动窗口,滚动窗口不会有此现象),所以同时存在多个窗口是可能的。注意,Window本身只是一个ID标识符,其内部可能存储了一些元数据,如TimeWindow中有开始和结束时间,但是并不会存储窗口中的元素。窗口中的元素实际存储在 Key/Value State 中,key为Window,value为元素集合(或聚合值)。为了保证窗口的容错性,该实现依赖了 Flink 的 State 机制。

WindowTrigger

2、每一个Window都拥有一个属于自己的 Trigger,Trigger上会有定时器,用来决定一个窗口何时能够被计算或清除。每当有元素加入到该窗口,或者之前注册的定时器超时了,那么Trigger都会被调用。

Trigger的返回结果可以是 :

(1)continue(继续、不做任何操作),

(2)Fire(触发计算,处理窗口数据),

(3)Purge(触发清理,移除窗口和窗口中的数据),

(4)Fire + purge(触发计算+清理,处理数据并移除窗口和窗口中的数据)。

当数据到来时,调用Trigger判断是否需要触发计算,如果调用结果只是Fire的话,那么会计算窗口并保留窗口原样,也就是说窗口中的数据不清理,等待下次Trigger fire的时候再次执行计算。窗口中的数据会被反复计算,直到触发结果清理。在清理之前,窗口和数据不会释放没所以窗口会一直占用内存。

Trigger 触发流程:

3、当Trigger Fire了,窗口中的元素集合就会交给Evictor(如果指定了的话)。Evictor 主要用来遍历窗口中的元素列表,并决定最先进入窗口的多少个元素需要被移除。剩余的元素会交给用户指定的函数进行窗口的计算。如果没有 Evictor 的话,窗口中的所有元素会一起交给函数进行计算。

4、计算函数收到了窗口的元素(可能经过了 Evictor 的过滤),并计算出窗口的结果值,并发送给下游。窗口的结果值可以是一个也可以是多个。DataStream API 上可以接收不同类型的计算函数,包括预定义的sum(),min(),max(),还有 ReduceFunction,FoldFunction,还有WindowFunction。WindowFunction 是最通用的计算函数,其他的预定义的函数基本都是基于该函数实现的。

5、Flink 对于一些聚合类的窗口计算(如sum,min)做了优化,因为聚合类的计算不需要将窗口中的所有数据都保存下来,只需要保存一个result值就可以了。每个进入窗口的元素都会执行一次聚合函数并修改result值。这样可以大大降低内存的消耗并提升性能。但是如果用户定义了 Evictor,则不会启用对聚合窗口的优化,因为 Evictor 需要遍历窗口中的所有元素,必须要将窗口中所有元素都存下来。

5、在流数据处理中,有没有遇到过数据延迟等问题,通过什么处理呢?

有遇到过数据延迟问题。举个例子:

案例1:假你正在去往地下停车场的路上,并且打算用手机点一份外卖。

选好了外卖后,你就用在线支付功能付款了,这个时候是11:50。恰好这时,你走进了地下停车库,而这里并没有手机信号。因此外卖的在线支付并没有立刻成功,而支付系统一直在Retry重试“支付”这个操作。

当你找到自己的车并且开出地下停车场的时候,已经是12:05了。这个时候手机重新有了信号,手机上的支付数据成功发到了外卖在线支付系统,支付完成。

在上面这个场景中可以看到,支付数据的事件时间是11:50,而支付数据的处理时间是12:05

案例2:某App会记录用户的所有点击行为,并回传日志(在网络不好的情况下,先保存在本地,延后回传)。

A 用户在11:02 对 App 进行操作,B用户在11:04操作了 App,

但是A 用户的网络不太稳定,回传日志延迟了,导致我们在服务端先接受到B 用户11:04 的消息,然后再接受到A 用户11:02 的消息,消息乱序了。

一般处理数据延迟、消息乱序等问题,通过WaterMark水印来处理。

水印是用来解决数据延迟、数据乱序等问题,总结如下图所示:

水印就是一个时间戳(timestamp),Flink可以给数据流添加水印

水印并不会影响原有Eventtime事件时间,用来 衡量 时间的进展

时间是 单调递增的(不减少)

处理乱序

表示(认为)wm之前的数据都处理完了

当数据流添加水印后,会按照水印时间来触发窗口计算,也就是说watermark水印是用来触发窗口计算的

设置水印时间,会比事件时间小几秒钟,表示最大允许数据延迟达到多久

水印时间 = 事件时间 - 允许延迟时间 (例如:10:09:57 = 10:10:00 - 3s )

6、WaterMark是什么?原理讲解一下?

这里的Watermark什么意思呢?

很简单,把数据流简单的理解为水流,那么当水流源源不断地流入系统时,什么时候我们才知道要开始对数据计算了呢?总不能一直等吧。

所以为了能够对数据计算的时间进行限定,我们的想法就是在水流上添加浮标或标记,当这个标记进入我们的数据窗口时,我们就认为可以开始计算了。这里在水流中增加的标记,我们就称之为Watermark(水位标记)

根据不同的数据处理场景watermark会有不同的生成方式:

有序数据:new AscendingTimestampExtractor<T>()

乱序数据:new BoundedOutOfOrdernessTimestampExtractor<T>(Time maxOutOfOrderness)

乱序数据中的watermark处理又分两大类:

AssignerWithPeriodicWatermarks 周期性生成wm(默认200ms),可以修改

AssignerWithPunctuatedWatermarks 打点式(来一条,生成一个)生成wm

无论升序还是乱序官方默认,watermark都是周期性生成,默认200ms生成一次

事件时间升序场景

用于事件时间进入flink处理的时候,是升序的场景

触发窗口的计算和关闭,只与时间(事件时间、处理时间)有关,与分组无关,只要时间到达,就会触发窗口的计算和关闭

方法:new AscendingTimestampExtractor<T>()

使用升序的事件时间处理方式,要保证事件时间进入到flink处理的时候,是升序的,否则会抛运行时异常

当elementTimestamp < lastTimestamp,违反了时间戳的单调升序

升序场景的wm:watermark = EventTime - 1ms

事件时间乱序场景

方法:new BoundedOutOfOrdernessTimestampExtractor<T>(Time maxOutOfOrderness)

乱序场景的wm:watermark = EventTime - 等待时间

wm是单调递增的,如果下一条数据对应的wm,小于上次的wm,那么此次wm保持不变,若大于上次的wm,就改变此次的wm,始终保持wm是递增或者保持不变

7、窗口如何划分的?什么时候触发计算?

窗口是10分钟触发一次,现在在12:00 - 12:10 有一个窗口,本来有一条数据是在12:00 - 12:10这个窗口被计算,但因为延迟,12:12到达,这时12:00 - 12:10 这个窗口就会被关闭,只能将数据下发到下一个窗口进行计算,这样就产生了数据延迟,造成计算不准确。

现在添加一个水位线:数据时间戳为2分钟。这时用数据产生的事件时间 12:12 -允许延迟的水印 2分钟 = 12:10 >= 窗口结束时间 。窗口触发计算,该数据就会被计算到这个窗口里。

窗口的划分

源码:TumblingEventTimeWindows.assignWindows()

timestamp是在环境env中,自己指定的事件时间

offset是设置时区偏移量的,默认用系统时区,无需设置,不影响结果

windowsize 是窗口大小

窗口的开始时间

start = timestamp - (timestamp - offset + windowSize) % windowSize

102 - (102 + 5) % 5 = 100ms 相当于对事件时间取整

窗口的结束时间

end = start + size => 窗口开始时间 + 窗口长度

窗口的范围 [start,end] [100ms,105ms]

窗口的数据范围 [start,end - 1ms] => [start,end) [100ms,105ms)

窗口的数据范围

左闭右开 => maxTimestamp = end - 1ms [100ms,104ms]

每个窗口都是new出来的对象(当一个窗口关闭的同时,下一个窗口就会被new出来)

窗口什么时候触发

源码:EventTimeTrigger.onElement()
ctx.getCurrentWatermark() >= window.maxTimestamp()触发计算,

当前watermark大于等于该创建最大时间戳的时候,触发窗口的计算

8、如果数据延迟非常严重呢?只使用WaterMark可以处理吗?那应该怎么解决?

使用 WaterMark + EventTimeWindow 机制可以在一定程度上解决数据乱序的问题,但是,WaterMark 水位线也不是万能的,在某些情况下,数据延迟会非常严重,即使通过Watermark + EventTimeWindow也无法等到数据全部进入窗口再进行处理,因为窗口触发计算后,对于延迟到达的本属于该窗口的数据,Flink默认会将这些延迟严重的数据进行丢弃

那么如果想要让一定时间范围的延迟数据不会被丢弃,可以使用Allowed Lateness(允许迟到机制/侧道输出机制)设定一个允许延迟的时间和侧道输出对象来解决

WaterMark + EventTimeWindow + Allowed Lateness方案(包含侧道输出),可以做到数据不丢失。

窗口允许迟到

xxx.allowedLateness(Time lateness)

事件时间乱序时:waterMark = 事件时间 - 等待时间(maxOutOfOrderness)

当 watermark >= 窗口结束时间, 会正常触发窗口的计算,但是不会关闭窗口

当 窗口结束时间 <= watermark < 窗口结束时间 + 窗口等待时间(allowedLateness(Time))时,每来一条迟到数据,都会触发一次计算,不会关闭窗口

当 watermark >= 窗口结束时间 + 窗口等待时间(允许迟到时间) 时,才会真正的关闭窗口

注意:

wm是一个特殊的时间戳,插入到数据流里,随着数据流的流动一起流动,多并行度,以小的wm为准

事件时间语义窗口的划分只有事件时间有关,分组只会影响窗口的数据个数,不会影响窗口的结束,窗口的开始和结束时间以事件时间为准

.assignTimestampsAndWatermarks(

                        new BoundedOutOfOrdernessTimestampExtractor<WaterSensor>(Time.seconds(3))

                            @Override

                            public long extractTimestamp(WaterSensor element)

                                return element.getTs() * 1000L;

                           

                       

                )

                .keyBy(sensor -> sensor.getId())

                .timeWindow(Time.seconds(5))

                .allowedLateness(Time.seconds(2))

                .process(

                        new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>()

                            // 输入的数据是:整个窗口 同一分组 的数据 一起 处理

                            @Override

                            public void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception

                                System.out.println("process...");

                                out.collect("当前key=" + s

                                        + "当前watermark=" + context.currentWatermark()

                                        + "窗口为[" + context.window().getStart() + "," + context.window().getEnd() + ")"

                                        + ",一共有" + elements.spliterator().estimateSize() + "条数据");

                           

                       

                )

                .print();

以上面代码为例:最大乱序时间为3s,窗口大小为5s,允许迟到时间2s,[0,5),[5,10),[10,15)

当事件时间8s 的数据来了,会触发[0,5)窗口数据的计算,因为当前wm为5s = 8s - 3s,大于等于 窗口结束时间5s,所以会触发计算,但是不会关闭窗口,因为,当前wm 还小于 (窗口结束数据5s + 允许迟到时间2s) ,所以不会关闭窗口,

若后面还有[0,5)窗口范围的事件时间 数据到来,依然会触发计算,来一条,触发一次计算,

只有当 wm 大于等于 7s(窗口结束数据5s + 允许迟到时间2s)时,即事件时间为10s时,才会触发[0,5)窗口的关闭,就算后面还有该窗口的数据,也不会计算。

9、Flink 中的状态State是什么?简单介绍下?

在Flink中,状态被称作state,是用来 保存 中间的 计算结果 或者 缓存数据。

根据状态是否需要保存中间结果,分为 无状态计算 和 有状态计算。

对于流计算而言,事件持续产生,如果每次计算相互独立,不依赖上下游的事件,则相同输入,可以得到相同输出,是无状态计算。

如果计算需要依赖于之前或者后续事件,则被称为有状态计算。

10、Flink 状态包括哪些?

(1) 按照由 Flink管理 还是 用户管理,状态可以分为 原始状态(Raw State)和 托管状态(ManagedState)

托管状态(ManagedState):由Flink 自行进行管理的State。

原始状态(Raw State):由用户自行进行管理。

两者区别:

1、从状态管理方式的方式来说:

Managed State 由Flink Runtime 管理,自动存储,自动恢复,在内存管理上有优化;

而 Raw State 需要用户自己管理,需要自己序列化,Flink 不知道 State 中存入的数据是什么结构,只有用户自己知道,需要最终序列化为可存储的数据结构。

2、从状态数据结构来说:

Managed State 支持已知的数据结构,如Value、List、Map等。

而 Raw State 只支持字节数组,所有状态都要转换为二进制字节数组才可以。

3、从推荐使用场景来说:

Managed State 大多数情况下均可使用,基本可以覆盖95%的场景

而Raw State 是当 Managed State 不够用时,比如需要自定义Operator 时,才会使用 Raw State。

在实际生产过程中,推荐使用 Managed State。

(2)状态始终与特定算子相关联。总的来说,有两种类型的状态,State 按照是否有 key 划分为 OperatorState 和 KeyedState 两种。

算子状态OperatorState特点:

1、可以用于所有算子,但整个算子只对应一个state。

2、并发改变时有多种重新分配的方式可选:均匀分配;

3、实现CheckpointedFunction或者 ListCheckpointed 接口。

4、目前只支持 ListState 数据结构。

算子状态的作用范围限定为算子任务。这意味着由同一并行任务(同一个subTask)所处理的所有数据都可以访问到相同的状态,状态对于同一任务而言是共享的。算子状态不能由相同或不同算子的另一个任务访问。

Flink为算子状态提供三种基本数据结构:

+ 列表状态(List state)

+ 将状态表示为一组数据的列表。

联合列表状态(Union list state)

也将状态表示为数据的列表。它与常规列表状态的区别在于,在发生故障时,或者 从保存点(savepoint)启动应用程序时如何恢复。

一种是均匀分配,另外一种是将所有 State 合并为全量 State 再分发给每个实例。

广播状态(Broadcast state)1.5之后引入的,在1.5之前有个广播变量。

如果一个算子有多项任务,而它的每项任务状态又都相同,那么这种特殊情况最适 合应用广播状态。

广播状态(mapstate集合),保存在TaskManage内存中,而TaskManage是个JVM进程,所以在堆内存中,如果数据过大,会占用过多堆内存

键控状态KeyedState特点:

+ 只能用在keyedStream上的算子中,状态跟特定的key绑定。

+ keyStream流上的每一个key 对应一个state 对象。若一个operator 实例处理多个key,访问相应的多个State,可对应多个state。

+ keyedState 保存在StateBackend中

+ 通过RuntimeContext访问,实现RichFunction接口。

+ 支持多种数据结构:ValueState、ListState、ReducingState、AggregatingState、MapState.

键控状态是根据输入数据流中定义的键(key)来维护和访问的。Flink为每个键值维护一个状态实例(即一个分组有一个状态,分组间的状态是隔离的,与是否在一个slot无关),并将具有相同键的所有数据,都分区到同一个算子任务中,这个任务会维护和处理这个key对应的状态。

当任务处理一条数据时,它会自动将状态的访问范围限定为当前数据的key。因此,具有相同key的所有数据都会访问相同的状态。Keyed State很类似于一个分布式的key-value map数据结构,只能用于KeyedStream(keyBy算子处理之后)。

11、Flink 广播状态了解吗?

Flink中,广播状态中叫作 BroadcastState。在广播状态模式中使用。所谓广播状态模式, 就是来自一个流的数据需要被广播到所有下游任务,在算子本地存储,在处理另一个流的时候依赖于广播的数据.下面以一个示例来说明广播状态模式。

广播状态(MapState集合),保存在TaskManage内存中,而TaskManage是个JVM进程,所以在堆内存中,如果数据过大,会占用过多堆内存,

广播状态(广播流)会应用到另一条流的每个算子上

12、Flink 状态接口包括哪些?

在Flink中使用状态,包含两种状态接口:

(1)状态操作接口:使用状态对象本身存储,写入、更新数据。

(2)状态访问接口:从StateBackend获取状态对象本身。

状态操作接口

Flink 中的 状态操作接口 面向两类用户,即 应用开发者 和 Flink 框架本身。所有Flink设计了两套接口

面向开发者State接口

面向开发的State接口只提供了对State中数据的增删改基本操作接口,用户无法访问状态的其他运行时所需要的信息。接口体系如下图:

面向内部State接口

内部State 接口 是给 Flink 框架使用,提供更多的State方法,可以根据需要灵活扩展。除了对State中数据的访问之外,还提供内部运行时信息,如State中数据的序列化器,命名空间(namespace)、命名空间的序列化器、命名空间合并的接口。内部State接口命名方式为InternalxxxState。

状态访问接口

有了状态之后,开发者自定义UDF时,应该如何访问状态?

状态会被保存在StateBackend中,但StateBackend 又包含不同的类型。

所以Flink中抽象了两个状态访问接口:OperatorStateStore 和 KeyedStateStore,用户在编写UDF时,就无须考虑到底是使用哪种 StateBackend类型接口。

OperatorStateStore 接口原理

OperatorState 数据以Map形式保存在内存中,并没有使用RocksDBStateBackend 和 HeapKeyedStateBackend。

KeyedStateStore 接口原理:

keyedStateStore 数据使用 RocksDBStateBackend 或者HeapKeyedStateBackend 来存储,KeyedStateStore中创建、获取状态都交给了具体的StateBackend来处理,KeyedStateStore本身更像是一个代理。

13、Flink 状态如何存储?

在Flink中, 状态存储被叫做 StateBackend , 它具备两种能力:

本地的状态管理

能够将State持久化到外部存储,提供容错能力,将检查点(checkpoint)状态写入远程存储(简单的说,就是对state的备份)

Flink状态 提供三种存储方式:

(1)内存:MemoryStateBackend,适用于验证、测试、不推荐生产使用。

(2)文件:FSStateBackend,适用于长周期大规模的数据。

(3)RocksDB : RocksDBStateBackend,适用于长周期大规模的数据。

上面提到的 StateBackend是面向用户的,在Flink内部3种 State 的关系如下图:

在运行时,MemoryStateBackend 和 FSStateBackend 本地的 State 都保存在TaskManager的内存中,所以其底层都依赖于HeapKeyedStateBackend。HeapKeyedStateBackend 面向Flink 引擎内部,使用者无须感知。

1、内存型 StateBackend

MemoryStateBackend,运行时所需的State数据全部保存在 TaskManager JVM堆上内存中,KV类型的State、窗口算子的State 使用HashTable 来保存数据、触发器等。

执行检查点(checkpoint)的时候,会把 State 的快照数据保存到JobManager进程的内存中。

MemoryStateBackend 可以使用异步的方式进行快照,(也可以同步),推荐异步,避免阻塞算子处理数据。

基于内存的 Statebackend 在生产环境下不建议使用,可以在本地开发调试测试 。

注意点如下 :

State 存储在 JobManager 的内存中,受限于 JobManager的内存大小。

每个 State 默认5MB,可通过 MemoryStateBackend 构造函数调整

每个 Stale 不能超过 Akka Frame 大小。

2、文件型 StateBackend

FSStateBackend,运行时所需的State数据全部保存在 TaskManager 的内存中, 执行检查点的时候,会把 State 的快照数据保存到配置的文件系统中。

可以是分布式或者本地文件系统,路径如:

HDFS路径:“hdfs://namenode:40010/flink/checkpoints”

本地路径:“file:///data/flink/checkpoints”。

FSStateBackend 适用于处理大状态、长窗口、或者大键值状态的有状态处理任务。

注意点如下 :

State 数据首先被存在 TaskManager 的内存中。

State 大小不能超过 TaskManager 内存。

TaskManager 异步将State数据写入外部存储。

MemoryStateBackend 和 FSStateBackend 都依赖于HeapKeyedStateBackend,HeapKeyedStateBackend 使用 State存储数据。

3、RocksDBStateBackend

RocksDBStateBackend 跟内存型和文件型都不同 。

RocksDBStateBackend 使用嵌入式的本地数据库 RocksDB 将流计算数据状态存储在本地磁盘中,不会受限于TaskManager 的内存大小,在执行检查点的时候,再将整个 RocksDB 中保存的State数据全量或者增量持久化到配置的文件系统中,

在 JobManager 内存中会存储少量的检查点元数据。RocksDB克服了State受内存限制的问题,同时又能够持久化到远端文件系统中,比较适合在生产中使用。

缺点:

RocksDBStateBackend 相比基于内存的StateBackend,访问State的成本高很多,可能导致数据流的吞吐量剧烈下降,甚至可能降低为原来的 1/10。

适用场景:

最适合用于处理大状态、长窗口,或大键值状态的有状态处理任务。

RocksDBStateBackend 非常适合用于高可用方案。

RocksDBStateBackend 是目前唯一支持增量检查点的后端。增量检查点非常适用于超 大状态的场景。

注意点如下:

总 State 大小仅限于磁盘大小,不受内存限制

RocksDBStateBackend 也需要配置外部文件系统,集中保存State 。

RocksDB的 JNI API 基于 byte 数组,单 key 和单 Value 的大小不能超过 8 字节

对于使用具有合并操作状态的应用程序,如ListState ,随着时间可能会累积到超过 2*31次方字节大小,这将会导致在接下来的查询中失败。

14、Flink 状态如何持久化?

首选,Flink的状态最终都要持久化到第三方存储中,确保集群故障或者作业挂掉后能够恢复。

RocksDBStateBackend 持久化策略有两种:

全量持久化策略 RocksFullSnapshotStrategy

增量持久化策略 RocksIncementalSnapshotStrategy

1、全量持久化策略

每次将全量的State写入到状态存储中(HDFS)。内存型、文件型、RocksDB类型,都支持全量持久化策略。

在执行持久化策略的时候,使用异步机制,每个算子启动1个独立的线程,将自身的状态写入分布式存储可靠存储中。在做持久化的过程中,状态可能会被持续修改,

基于内存的状态后端使用 CopyOnWriteStateTable 来保证线程安全,RocksDBStateBackend则使用RocksDB的快照机制,使用快照来保证线程安全。

2、增量持久化策略

增量持久化就是每次持久化增量的State,只有RocksDBStateBackend 支持增量持久化。

Flink 增量式的检查点以 RocksDB为基础, RocksDB是一个基于LSM-Tree的KV存储。新的数据保存在内存中, 称为memtable。如果Key相同,后到的数据将覆盖之前的数据,一旦memtable写满了,RocksDB就会将数据压缩并写入磁盘。memtable的数据持久化到磁盘后,就变成了不可变的 sstable。

因为 sstable 是不可变的,Flink对比前一个检查点创建和删除的RocksDB sstable 文件就可以计算出状态有哪些发生改变。

为了确保 sstable 是不可变的,Flink 会在RocksDB 触发刷新操作,强制将 memtable 刷新到磁盘上 。在Flink 执行检查点时,会将新的sstable 持久化到HDFS中,同时保留引用。

这个过程中 Flink 并不会持久化本地所有的sstable,因为本地的一部分历史sstable 在之前的检查点中已经持久化到存储中了,只需增加对 sstable文件的引用次数就可以。

RocksDB会在后台合并 sstable 并删除其中重复的数据。然后在RocksDB删除原来的 sstable,替换成新合成的 sstable.。新的 sstable 包含了被删除的 sstable中的信息,通过合并历史的sstable会合并成一个新的 sstable,并删除这些历史sstable. 可以减少检查点的历史文件,避免大量小文件的产生。

15、Flink 状态过期后如何清理?

1、DataStream中状态过期

可以对DataStream中的每一个状态设置 清理策略 StateTtlConfig,可以设置的内容如下:

过期时间:超过多长时间未访问,视为State过期,类似于缓存。

过期时间更新策略:创建和写时更新、读取和写时更新。

State可见性:未清理可用,超时则不可用。

2、Flink SQL中状态过期

Flink SQL 一般在流Join、聚合类场景使用State,如果State不定时清理,则导致State过多,内存溢出。清理策略配置如下:

StreamQueryConfig config = ...

//设置过期时间为 min = 12小时 ,max = 24小时

config.withIdleStateRetentionTime(Time.hours(12),Time.hours(24));

16、Flink 通过什么实现可靠的容错机制?

Flink 使用 轻量级分布式快照,设计检查点(checkpoint)实现可靠容错。

17、什么是Checkpoin检查点?

Checkpoint被叫做检查点,是Flink实现容错机制最核心的功能,是Flink可靠性的基石,它能够根据配置周期性地基于Stream中各个Operator的状态来生成Snapshot快照,从而将这些状态数据定期持久化存储下来,当Flink程序一旦意外崩溃时,重新运行程序时可以有选择地从这些Snapshot进行恢复,从而修正因为故障带来的程序数据状态中断。

Flink的checkpoint机制原理来自“Chandy-Lamport algorithm”算法

注意:区分State和Checkpoint

1、State:

一般指一个具体的Task/Operator的状态(operator的状态表示一些算子在运行的过程中会产生的一些中间结果)

State数据默认保存在Java的堆内存中/TaskManage节点的内存中,

State可以被记录,在失败的情况下数据还可以恢复。

2、Checkpoint:

表示了一个FlinkJob在一个特定时刻的一份全局状态快照,即包含了所有Task/Operator的状态,可以理解为Checkpoint是把State数据定时持久化存储了,

比如KafkaConsumer算子中维护的Offset状态,当任务重新恢复的时候可以从Checkpoint中获取。

18、什么是Savepoin保存点?

保存点在 Flink 中叫作 Savepoint,是基于 Flink 检查点机制的应用完整快照备份机制,

用来保存状态 可以在另一个集群或者另一个时间点,从保存的状态中将作业恢复回来。适用 于应用升级(要求逻辑拓扑图的结构不能变)、集群迁移、Flink 集群版本更新、A/B测试以及假定场景、暂停和重启、归档等场景。

保存点可以视为一个(算子 ID -> State) 的Map,对于每一个有状态的算子,Key是算子ID,Value是算子State。

19、什么是CheckpointCoordinator检查点协调器?

Flink中检查点协调器叫作 CheckpointCoordinator,负责协调 Flink 算子的 State 的分布式快照。当触发快照的时候,CheckpointCoordinator向 Source 算子中注入Barrier消息 ,然后等待所有的Task通知检查点确认完成,同时持有所有 Task 在确认完成消息中上报的State句柄。

20、Checkpoint中保存的是什么信息?

检查点里面到底保存着什么信息呢?以flink消费kafka数据wordcount为例:

1、我们从Kafka读取到一条条的日志,从日志中解析出app_id,然后将统计的结果放到内存中一个Map集合,app_id做为key,对应的pv做为value,每次只需要将相应app_id 的pv值+1后put到Map中即可;

2、kafka topic:test;

3、flink运算流程如下:

kafka topic有且只有一个分区

假设kafka的topic-test只有一个分区,flink的Source task记录了当前消费到kafka test topic的所有partition的offset

例:(0,1000)表示0号partition目前消费到offset为1000的数据

Flink的pv task记录了当前计算的各app的pv值,假设这里有两个app:app1、app2

例:(app1,50000),(app2,10000)

表示app1当前pv值为 50000

表示app2当前pv值为 10000

每来一条数据,只需要确定相应app_id,将相应的value值+1后put到map中即可;

该案例中,CheckPoint保存的其实就是第n次CheckPoint消费的offset信息和各app的pv值信息,记录一下发生CheckPoint当前的状态信息,并将该状态信息保存到相应的状态后端(注:状态后端是保存状态的地方,决定状态如何保存,如何保障状态高可用,我们只需要知道,我们能从状态后端拿到offset信息和pv信息即可。状态后端必须是高可用的,否则我们的状态后端经常出现故障,会导致无法通过checkpoint来恢复我们的应用程序)。

chk-100

offset:(0,1000)

pv:(app1,50000)(app2,10000)

该状态信息表示第100次CheckPoint的时候, partition 0 offset消费到了1000

21、当作业失败后,检查点如何恢复作业?

Flink提供了 应用自动恢复机制 和 手动作业恢复机制。

应用自动恢复机制:

Flink设置有作业失败重启策略,包含三种:

1、定期恢复策略:fixed-delay

固定延迟重启策略会尝试一个给定的次数来重启Job,如果超过最大的重启次数,Job最终将失败,在连续两次重启尝试之间,重启策略会等待一个固定时间,默认Integer.MAX_VALUE次

2、失败比率策略:failure-rate

失败率重启策略在job失败后重启,但是超过失败率后,Job会最终被认定失败,在两个连续的重启尝试之间,重启策略会等待一个固定的时间。

3、直接失败策略:None 失败不重启

手动作业恢复机制。

因为Flink检查点目录分别对应的是JobId,每通过flink run 方式/页面提交方式恢复都会重新生成 jobId,

Flink 提供了在启动之时通过设置 -s .参数指定检查点目录的功能,让新的 jobld 读取该检查点元文件信息和状态信息,从而达到指定时间节点启动作业的目的。

启动方式如下:

/bin/flink -s /flink/checkpoints/03112312a12398740a87393/chk-50

22、当作业失败后,从保存点如何恢复作业?

从保存点恢复作业并不简单,尤其是在作业变更(如修改逻辑、修复 bug) 的情况下, 需要考虑如下几点:

(1)算子的顺序改变

如果对应的 UID 没变,则可以恢复,如果对应的 UID 变了恢复失败。

(2)作业中添加了新的算子

如果是无状态算子,没有影响,可以正常恢复,如果是有状态的算子,跟无状态的算子 一样处理。

(3)从作业中删除了一个有状态的算子

默认需要恢复保存点中所记录的所有算子的状态,如果删除了一个有状态的算子,从保存点回复的时候被删除的OperatorID找不到,所以会报错 可以通过在命令中添加 -- allowNonReStoredSlale (short: -n )跳过无法恢复的算子 。

(4)添加和删除无状态的算子

如果手动设置了 UID 则可以恢复,保存点中不记录无状态的算子 如果是自动分配的 UID ,那么有状态算子的可能会变( Flink 一个单调递增的计数器生成 UID,DAG 改版,计数器极有可能会变) 很有可能恢复失败。

23、Flink如何实现轻量级异步分布式快照?

要实现分布式快照,最关键的是能够将数据流切分。Flink 中使用 Barrier (障碍物)来切分数据流。Barrier 会周期性地注入数据流中,作为数据流的一部分,从上游到下游被算子处理。

Barrier 会严格保证顺序,不会超过其前边的数据。Barrier 将记录分割成记录集,两个 Barrier 之间的数据流中的数据隶属于同一个检查点。每一个 Barrier 都携带一个其所属快照的 ID 编号。Barrier 随着数据向下流动,不会打断数据流,因此非常轻量。在一个数据流中,可能会存在多个隶属于不同快照的 Barrier ,并发异步地执行分布式快照,如下图所示:

Barrier 会在数据流源头被注入并行数据流中。Barrier n所在的位置就是恢复时数据重新处理的起始位置。例如,在Kafka中,这个位置就是最后一个记录在分区内的偏移量 ( offset) ,作业恢复时,会根据这个位置从这个偏移量之后向 kafka 请求数据 这个偏移量就是State中保存的内容之一。

Barrier 接着向下游传递。当一个非数据源算子从所有的输入流中收到了快照 n 的Barrier时,该算子就会对自己的 State 保存快照,并向自己的下游 广播 发送快照 n 的 Barrier。一旦 Sink 算子接收到 Barrier ,有两种情况:

(1)如果是引擎内严格一次处理保证,当 Sink 算子已经收到了所有上游的 Barrie n 时, Sink 算子对自己的 State 进行快照,然后通知检查点协调器( CheckpointCoordinator) 。当所有 的算子都向检查点协调器汇报成功之后,检查点协调器向所有的算子确认本次快照完成。

(2)如果是端到端严格一次处理保证,当 Sink 算子已经收到了所有上游的 Barrie n 时, Sink 算子对自己的 State 进行快照,并预提交事务(两阶段提交的第一阶段),再通知检查点协调器(CheckpointCoordinator) ,检查点协调器向所有的算子确认本次快照完成,Sink 算子提交事务(两阶段提交的第二阶段),本次事务完成。

接着 33 的案例来具体说一下如何执行分布式快照:

对应到pv案例中就是,Source Task接收到JobManager的编号为chk-100(从最近一次恢复)的CheckPoint触发请求后,发现自己恰好接收到kafka offset(0,1000)处的数据,所以会往offset(0,1000)数据之后offset(0,1001)数据之前安插一个barrier,然后自己开始做快照,也就是将offset(0,1000)保存到状态后端chk-100中。然后barrier接着往下游发送,当统计pv的task接收到barrier后,也会暂停处理数据,将自己内存中保存的pv信息(app1,50000)(app2,10000)保存到状态后端chk-100中。OK,flink大概就是通过这个原理来保存快照的;

统计pv的task接收到barrier,就意味着barrier之前的数据都处理了,所以说,不会出现丢数据的情况。

24、什么是Barrier对齐?

一旦Operator从输入流接收到CheckPoint barrier n,它就不能处理来自该流的任何数据记录,直到它从其他所有输入接收到barrier n为止。否则,它会混合属于快照n的记录和属于快照n + 1的记录;

如上图所示:

图1,算子收到数字流的Barrier,字母流对应的barrier尚未到达

图2,算子收到数字流的Barrier,会继续从数字流中接收数据,但这些流只能被搁置,记录不能被处理,而是放入缓存中,等待字母流 Barrier到达。在字母流到达前, 1,2,3数据已经被缓存。

图3,字母流到达,算子开始对齐State进行异步快照,并将Barrier向下游广播,并不等待快照执行完毕。

图4,算子做异步快照,首先处理缓存中积压数据,然后再从输入通道中获取数据。

25、什么是Barrier不对齐?

checkpoint 是要等到所有的barrier全部都到才算完成

上述图2中,当还有其他输入流的barrier还没有到达时,会把已到达的barrier之后的数据1、2、3搁置在缓冲区,等待其他流的barrier到达后才能处理。

barrier不对齐:就是指当还有其他流的barrier还没到达时,为了不影响性能,也不用理会,直接处理barrier之后的数据。等到所有流的barrier的都到达后,就可以对该Operator做CheckPoint了;

26、为什么要进行barrier对齐?不对齐到底行不行?

Exactly Once时必须barrier对齐,如果barrier不对齐就变成了At Least Once;

CheckPoint的目的就是为了保存快照,如果不对齐,那么在chk-100快照之前,已经处理了一些chk-100 对应的offset之后的数据,当程序从chk-100恢复任务时,chk-100对应的offset之后的数据还会被处理一次,所以就出现了重复消费。

27、Flink支持Exactly-Once语义,那什么是Exactly-Once?

Exactly-Once语义:指端到端的一致性,从数据读取、引擎计算、写入外部存储的整个过程中,即使机器或软件出现故障,都确保数据仅处理一次,不会重复、也不会丢失。

28、要实现 Exactly-Once,需具备什么条件?

流系统要实现Exactly-Once,需要保证上游 Source 层、中间计算层和下游 Sink 层三部分同时满足端到端严格一次处理

Source端:数据从上游进入Flink,必须保证消息严格一次消费。同时Source 端必须满足可重放(replay)。否则 Flink 计算层收到消息后未计算,却发生 failure 而重启,消息就会丢失。

Flink计算层:利用 Checkpoint 机制,把状态数据定期持久化存储下来,Flink程序一旦发生故障的时候,可以选择状态点恢复,避免数据的丢失、重复。

Sink端:Flink将处理完的数据发送到Sink端时,通过 两阶段提交协议 ,即 TwoPhaseCommitSinkFunction 函数。该 SinkFunction 提取并封装了两阶段提交协议中的公共逻辑,保证 Flink 发送Sink端时实现严格一次处理语义。同时:Sink端必须支持事务机制,能够进行数据回滚或者满足幂等性。

回滚机制:即当作业失败后,能够将部分写入的结果回滚到之前写入的状态。

幂等性:就是一个相同的操作,无论重复多少次,造成的结果和只操作一次相等。即当作业失败后,写入部分结果,但是当重新写入全部结果时,不会带来负面结果,重复写入不会带来错误结果。

29、什么是两阶段提交协议?

两阶段提交协议(Two -Phase Commit,2PC)是解决分布式事务问题最常用的方法,它可以保证在分布式事务中,要么所有参与进程都提交事务,要么都取消,即实现ACID中的 A(原子性)。

两阶段提交协议中 有两个重要角色,协调者(Coordinator)和 参与者(Participant),其中协调者只有一个,起到分布式事务的协调管理作用,参与者有多个。

两阶段提交阶段分为两个阶段:投票阶段(Voting)和 提交阶段(Commit)。

投票阶段:

(1)协调者向所有参与者发送 prepare 请求和事务内容,询问是否可以准备事务提交,等待参与者的相应。

(2)参与者执行事务中包含的操作,并记录 undo 日志(用于回滚)和 redo 日志(用于重放),但不真正提交。

(3)参与者向协调者返回事务操作的执行结果,执行成功返回yes,失败返回no。

提交阶段:

分为成功与失败两种情况。

若所有参与者都返回 yes,说明事务可以提交:

协调者向所有参与者发送 commit 请求。

参与者收到 commit 请求后,将事务真正地提交上去,并释放占用的事务资源,并向协调者返回 ack 。

协调者收到所有参与者的 ack 消息,事务成功完成,如下图:

若有参与者返回 no 或者超时未返回,说明事务中断,需要回滚:

协调者向所有参与者发送rollback请求。

参与者收到rollback请求后,根据undo日志回滚到事务执行前的状态,释放占用的事务资源,并向协调者返回ack。

协调者收到所有参与者的ack消息,事务回滚完成。

30、Flink 如何保证 Exactly-Once 语义?

Flink通过两阶段提交协议来保证Exactly-Once语义。

对于 Source 端:Source端严格一次处理比较简单,因为数据要进入Flink 中,所以Flink 只需要保存消费数据的偏移量 (offset)即可。如果Source端为 kafka,Flink 将 Kafka Consumer 作为 Source,可以将偏移量保存下来,如果后续任务出现了故障,恢复的时候可以由连接器重置偏移量,重新消费数据,保证一致性。

对于 Sink 端:Sink 端是最复杂的,因为数据是落地到其他系统上的,数据一旦离开 Flink 之后,Flink 就监控不到这些数据了,所以严格一次处理语义必须也要应用于 Flink 写入数据的外部系统,故这些外部系统必须提供一种手段允许提交或回滚这些写入操作,同时还要保证与 Flink Checkpoint 能够协调使用(Kafka 0.11 版本已经实现精确一次处理语义)。

以 Kafka->Flink->Kafka 为例,说明如何保证Exactly-Once语义。

Flink源码篇,作业提交流程作业调度流程作业内部转换流程图

Flink从入门到精通100篇(二十二)- Flink应用实战案例:如何实现网络流控与反压机制

Flink流量控制与反压机制完全总结

Flink1.12-四大基石详解

FLINK重点原理与机制:内存网络流控及反压机制剖析

Flink四大基石