Flink Checkpoint 机制详解

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink Checkpoint 机制详解相关的知识,希望对你有一定的参考价值。

参考技术A

一、Checkpoint 简介

Flink 的 Checkpoint 机制是其 可靠性 的基石。当一个任务在运行过程中出现故障时,可以根据 Checkpoint 的信息恢复到故障之前的某一状态,然后从该状态恢复任务的运行。 在 Flink 中,Checkpoint 机制采用的是 chandy-lamport (分布式快照)算法,通过 Checkpoint 机制,保证了 Flink 程序内部的 Exactly Once 语义。

二、Checkpoint 机制流程详解

1. 任务启动

我们假设任务从 Kafka 的某个 Topic 中读取数据,该Topic 有 2 个 Partition,故任务的并行度为 2。根据读取到数据的奇偶性,将数据分发到两个 task 进行求和。

某一时刻,状态如下:

2.启动Checkpoint

JobManager 根据 Checkpoint 间隔时间,启动 Checkpoint。此时会给每个 Source 发送一个 barrier 消息,消息中的数值表示 Checkpoint 的序号,每次启动新的 Checkpoint 该值都会递增。

3. Source启动Checkpoint

当Source接收到barrier消息,会将当前的状态(Partition、Offset)保存到 StateBackend,然后向 JobManager 报告Checkpoint 完成。之后Source会将barrier消息广播给下游的每一个 task:

4.task 接收 barrier

当task接收到某个上游(如这里的Source1)发送来的barrier,会将该上游barrier之前的数据继续进行处理,而barrier之后发送来的消息不会进行处理,会被缓存起来。

之前对barrier的理解比较模糊,直到看到了下面这幅图。barrier的作用和这里 "欢迎光临" 牌子的作用类似,用于区分流中的数据属于哪一个 Checkpoint:

我们可以理解为:barrier之前的数据属于本次Checkpoint,barrier之后的数据属于下一次Checkpoint,所以下次Checkpoint的数据是不应该在本次Checkpoint过程中被计算的,因此会将数据进行缓存。

5.barrier对齐

如果某个task有多个上游输入,如这里的 sum_even 有两个 Source 源,当接收到其中一个 Source 的barrier后,会等待其他 Source 的 barrier 到来。在此期间,接收到 barrier 的 Source 发来的数据不会处理,只会缓存(如下图中的数据4)。而未接收到 barrier 的 Source 发来的数据依然会进行处理,直到接收到该Source 发来的 barrier,这个过程称为 barrier的对齐

barrier是否对齐决定了程序实现的是 Exactly Once 还是 At Least Once:

如果不进行barrier对齐,那么这里 sum_even 在接收 Source2 的 barrier 之前,对于接收到 Source1的 数据4 ,不会进行缓存,而是直接进行计算,sum_even 的状态改为12,当接收到 Source2 的barrier,会将 sum_even 的状态 sum=12 进行持久化。如果本次Checkpoint成功,在进行下次 Checkpoint 前任务崩溃,会根据本次Checkpoint进行恢复。此时状态如下:

从这里我们就可以看出, Source1的数据4被计算了两次 。因此,Exactly Once语义下,必须进行barrier的对齐,而 At Least Once语义下 barrier 可以不对齐。

注意:barrier对齐只会发生在多对一的Operator(如 join)或者一对多的Operator(如 reparation/shuffle)。如果是一对一的Operator,如map、flatMap 或 filter 等,则没有对齐这个概念,都会实现Exactly Once语义,即使程序中配置了At Least Once 。

6.处理缓存数据

当task接收到所有上游发送来的barrier,即可以认为当前task收到了本次 Checkpoint 的所有数据。之后 task 会将 barrier 继续发送给下游,然后处理缓存的数据,比如这里 sum_even 会处理 Source1 发送来的数据4. 而且,在这个过程中 Source 会 继续读取数据 发送给下游,并不会中断。

7.上报Checkpoint完成

当sink收到barrier后,会向JobManager上报本次Checkpoint完成。至此,本次Checkpoint结束,各阶段的状态均进行了持久化,可以用于后续的故障恢复。

flink超越Spark的Checkpoint机制

前面,已经有一篇文章讲解了spark的checkpoint:


同时,浪尖也在知识星球里发了源码解析的文章。spark streaming的Checkpoint仅仅是针对driver的故障恢复做了数据和元数据的Checkpoint。而本文要讲的flink的checkpoint机制要复杂了很多,它采用的是轻量级的分布式快照,实现了每个操作符的快照,及循环流的在循环的数据的快照。详细的算法后面浪尖会给出文章。

欢迎点击阅读原文,加入浪尖知识星球,更深入学习spark等大数据知识。

flink超越Spark的Checkpoint机制


flink超越Spark的Checkpoint机制
1. 简介


Apache Flink提供容错机制,以持续恢复数据流应用程序的状态。该机制确保即使存在故障,程序的每条记录只会作用于状态一次(exactly-once),当然也可以降级为至少一次(at-least-once)。

容错机制持续地制作分布式流数据流的快照。对于状态较小的流应用程序,这些快照非常轻量级,可以频繁产生快照,而不会对性能产生太大影响。流应用程序的状态存储的位置是可以配置的(例如存储在master节点或HDFS)。

如果程序失败(由于机器,网络或软件故障),Flink将停止分布式数据流。然后,系统重新启动操作算子并将其重置为最新的成功checkpoint。输入流将重置为状态快照记录的位置。 作为重新启动的并行数据流的一部分被处理的任何记录都保证不会成为先前checkpoint状态的一部分。

注意:默认情况下,禁用checkpoint。

注意:要使容错机制完整,数据源(如消息队列或者broker)要支持数据回滚到历史记录的位置。 Apache Kafka具有这种能力,Flink与Kafka的连接器利用了该功能。

注意:由于Flink的checkpoint是通过分布式快照实现的,因此快照和checkpoint的概念可以互换使用。

flink超越Spark的Checkpoint机制
2. Checkpointing


Flink的容错机制的核心部分是制作分布式数据流和操作算子状态的一致性快照。 这些快照充当一致性checkpoint,系统可以在发生故障时回滚。 Flink用于制作这些快照的机制在“分布式数据流的轻量级异步快照”中进行了描述。 它受到分布式快照的标准Chandy-Lamport算法的启发,专门针对Flink的执行模型而定制。

flink超越Spark的Checkpoint机制
2.1 Barriers


Flink分布式快照的核心概念之一是barriers。 这些barriers被注入数据流并与记录一起作为数据流的一部分向下流动。 barriers永远不会超过记录,数据流严格有序。 barriers将数据流中的记录分为进入当前快照的记录和进入下一个快照的记录。每个barriers都带有快照的ID,并且barriers之前的记录都进入了该快照。 barriers不会中断流的流动,非常轻量级。 来自不同快照的多个barriers可以同时在流中出现,这意味着可以同时发生各种快照。

flink超越Spark的Checkpoint机制

barriers在数据流源处被注入并行数据流中。快照n的barriers被插入的位置(我们称之为Sn)是快照所包含的数据在数据源中最大位置。例如,在Apache Kafka中,此位置将是分区中最后一条记录的偏移量。 将该位置Sn报告给checkpoint协调器(Flink的JobManager)。

然后barriers向下游流动。当一个中间操作算子从其所有输入流中收到快照n的barriers时,它会为快照n发出barriers进入其所有输出流中。 一旦sink操作算子(流式DAG的末端)从其所有输入流接收到barriers n,它就向checkpoint协调器确认快照n完成。在所有sink确认快照后,意味快照着已完成。

一旦完成快照n,job将永远不再向数据源请求Sn之前的记录,因为此时这些记录(及其后续记录)将已经通过整个数据流拓扑,也即是已经被处理结束啦。

flink超越Spark的Checkpoint机制

接收多个输入流的运算符需要基于快照barriers对齐输入流。 上图说明了这一点:

  • 一旦操作算子从一个输入流接收到快照barriers n,它就不能处理来自该流的任何记录,直到它从其他输入接收到barriers n为止。 否则,它会搞混属于快照n的记录和属于快照n + 1的记录。

  • barriers n所属的流暂时会被搁置。 从这些流接收的记录不会被处理,而是放入输入缓冲区。

  • 一旦从最后一个流接收到barriers n,操作算子就会发出所有挂起的向后传送的记录,然后自己发出快照n的barriers。

  • 之后,它恢复处理来自所有输入流的记录,在处理来自流的记录之前优先处理来自输入缓冲区的记录。

flink超越Spark的Checkpoint机制
2.2 state


当运算符包含任何形式的状态时,此状态也必须是快照的一部分。操作算子状态有不同的形式:

用户定义的状态:这是由转换函数(如map()或filter())直接创建和修改的状态。

系统状态:此状态是指作为运算符计算一部分的数据缓冲区。此状态的典型示例是窗口缓冲区,系统在其中收集(和聚合)窗口里的记录,直到窗口被计算和抛弃。

操作算子在他们从输入流接收到所有快照barriers时,以及在向其输出流发出barriers之前,会对其状态进行写快照。此时,在 barrier 之前的数据对状态的更新已经完成,barrier 之后的数据不会更新状态。 由于快照的状态可能很大,因此它存储在可配置的状态后端中。默认情况下,是存储到JobManager的内存,但对于生产使用,应配置分布式可靠存储(例如HDFS)。 在存储状态之后,操作算子确认checkpoint完成,将快照barriers发送到输出流中,然后继续。

生成的快照现在包含:

  • 对于每个并行流数据源,创建快照时流中的偏移/位置

  • 对于每个运算符,存储在快照中的状态指针

flink超越Spark的Checkpoint机制

flink超越Spark的Checkpoint机制
2.3 Exactly Once vs. At Least Once


对齐步骤可能增加流式程序的等待时间。通常,这种额外的延迟大约为几毫秒,但也会见到一些延迟显着增加的情况。 对于要求所有记录始终具有超低延迟(几毫秒)的应用程序,Flink可以在checkpoint期间跳过流对齐。一旦操作算子看到每个输入流的checkpoint barriers,就会写 checkpoint 快照。

当跳过对齐时,即使在 checkpoint n 的某些 checkpoint barriers 到达之后,操作算子仍继续处理所有输入。这样,操作算子还可以在创建 checkpoint n 的状态快照之前,继续处理属于checkpoint n + 1的数据。 在还原时,这些记录将作为重复记录出现,因为它们都包含在 checkpoint n 的状态快照中,并将作为 checkpoint n 之后数据的一部分进行重复处理。

注意:对齐仅适用于具有多个输入(join)的运算符以及具有多个输出的运算符(在流重新分区/shuffle之后)。 正因为如此,对于只有map(),flatMap(),filter()等操作,实际上即使在至少一次模式下也能提供一次保证。

flink超越Spark的Checkpoint机制
2.4 异步状态快照


注意,上述机制意味着操作算子在将状态的快照存储在状态后端时,停止处理输入记录。每次写快照时,这种同步状态快照操作都会引入延迟。

可以让操作算子在存储状态快照时继续处理,高效地让状态快照存储在后台异步发生。为此,操作算子必须能够生成一个状态对象,该状态对象应以某种方式存储,以便对操作算子状态的进一步修改不会影响该状态对象。 例如,RocksDB中使用的写时复制(copy-on-write)数据结构具有这种能力。

在接收到输入的checkpoint的barriers后,操作算子启动其状态的异步快照复制。它立即释放其barriers到输出,并继续进行常规流处理。后台复制过程完成后,它会向checkpoint协调器(JobManager)确认checkpoint完成。 checkpoint仅在所有sink都已收到barriers并且所有有状态操作算子已确认其完成备份(可能在barriers到达sink之后)之后才算完成。

flink超越Spark的Checkpoint机制
2.5 Recovery


在这种机制下的恢复是很直接的:当失败时,Flink选择最新完成的checkpoint k。 然后,系统重新部署整个分布式数据流,并为每个操作算子重置作为checkpoint k的一部分的快照的状态。 数据源设置为从位置Sk开始读取。 例如在Apache Kafka中,这意味着告诉消费者从偏移量Sk开始读取。

如果状态以递增方式写快照,则操作算子从最新完整快照的状态开始,然后对该状态应用一系列增量快照更新。

2.6 操作算子快照的实现


在创建操作算子快照时,有两部分:同步部分和异步部分。

操作算子和状态后端将其快照提供为Java FutureTask。 该任务包含同步部分已完成且异步部分处于挂起状态的状态。 然后,异步部分由该checkpoint的后台线程执行。

完全同步的checkpoint返回已经完成的FutureTask的运算符。 如果需要执行异步操作,则在FutureTask的run()方法中执行。

任务是可取消的,可以释放流和其他资源消耗的句柄。

推荐阅读:

1.

2.

以上是关于Flink Checkpoint 机制详解的主要内容,如果未能解决你的问题,请参考以下文章

Flink源码阅读--Checkpoint触发机制

Flink可靠性的基石-checkpoint机制详细解析

【Flink 精选】阐述 Flink 的容错机制,剖析 Checkpoint 实现流程

Flink容错机制(checkpoint)

Flink 源码:Checkpoint 元数据详解

flink超越Spark的Checkpoint机制