Flink基础系列28-Flink容错机制

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink基础系列28-Flink容错机制相关的知识,希望对你有一定的参考价值。

参考技术A

在执行流应用程序期间,Flink 会定期保存状态的一致检查点

如果发生故障, Flink 将会使用最近的检查点来一致恢复应用程序的状态,并重新启动处理流程

(如图中所示,7这个数据被source读到了,准备传给奇数流时,奇数流宕机了,数据传输发生中断)

遇到故障之后,第一步就是重启应用

(重启后,起初流都是空的)

第二步是从 checkpoint 中读取状态,将状态重置

(读取在远程仓库(Storage,这里的仓库指状态后端保存数据指定的三种方式之一)保存的状态)

从检查点重新启动应用程序后,其内部状态与检查点完成时的状态完全相同

第三步:开始消费并处理检查点到发生故障之间的所有数据

这种检查点的保存和恢复机制可以为应用程序状态提供“精确一次”(exactly-once)的一致性,因为所有算子都会保存检查点并恢复其所有状态,这样一来所有的输入流就都会被重置到检查点完成时的位置

(这里要求source源也能记录状态,回退到读取数据7的状态,kafka有相应的偏移指针能完成该操作)

概述
checkpoint和Watermark一样,都会以广播的形式告诉所有下游。

具体讲解

JobManager 会向每个 source 任务发送一条带有新检查点 ID 的消息,通过这种方式来启动检查点

(这个带有新检查点ID的东西为barrier,由图中三角型表示,数值2只是ID)

数据源将它们的状态写入检查点,并发出一个检查点barrier
状态后端在状态存入检查点之后,会返回通知给source任务,source任务就会向JobManager确认检查点完成
上图,在Source端接受到barrier后,将自己此身的3 和 4 的数据的状态写入检查点,且向JobManager发送checkpoint成功的消息,然后向下游分别发出一个检查点 barrier

可以看出在Source接受barrier时,数据流也在不断的处理,不会进行中断

此时的偶数流已经处理完蓝2变成了4,但是还没处理到黄4,只是下游sink发送了一个数据4,而奇数流已经处理完蓝3变成了8(黄1+蓝1+黄3+蓝3),并向下游sink发送了8

此时检查点barrier都还未到Sum_odd奇数流和Sum_even偶数流

分界线对齐:barrier向下游传递,sum任务会等待所有输入分区的barrier到达
对于barrier已经达到的分区,继续到达的数据会被缓存
而barrier尚未到达的分区,数据会被正常处理
此时蓝色流的barrier先一步抵达了偶数流,黄色的barrier还未到,但是因为数据的不中断一直处理,此时的先到的蓝色的barrier会将此时的偶数流的数据4进行缓存处理,流接着处理接下来的数据等待着黄色的barrier的到来,而黄色barrier之前的数据将会对缓存的数据相加

这次处理的总结:分界线对齐:barrier 向下游传递,sum 任务会等待所有输入分区的 barrier 到达,对于barrier已经到达的分区,继续到达的数据会被缓存。而barrier尚未到达的分区,数据会被正常处理

当收到所有输入分区的 barrier 时,任务就将其状态保存到状态后端的检查点中,然后将 barrier 继续向下游转发
当蓝色的barrier和黄色的barrier(所有分区的)都到达后,进行状态保存到远程仓库,然后对JobManager发送消息,说自己的检查点保存完毕了

此时的偶数流和奇数流都为8

向下游转发检查点 barrier 后,任务继续正常的数据处理

Sink 任务向 JobManager 确认状态保存到 checkpoint 完毕
当所有任务都确认已成功将状态保存到检查点时,检查点就真正完成了

CheckPoint为自动保存,SavePoint为手动保存

有状态的流处理,内部每个算子任务都可以有自己的状态

对于流处理器内部来说,所谓的状态一致性,其实就是我们所说的计算结果要保证准确。

一条数据不应该丢失,也不应该重复计算

在遇到故障时可以恢复状态,恢复以后的重新计算,结果应该也是完全正确的。

Flink的一个重大价值在于,它既保证了exactly-once,也具有低延迟和高吞吐的处理能力。

Flink使用了一种轻量级快照机制——检查点(checkpoint)来保证exactly-once语义
有状态流应用的一致检查点,其实就是:所有任务的状态,在某个时间点的一份备份(一份快照)。而这个时间点,应该是所有任务都恰好处理完一个相同的输入数据的时间。
应用状态的一致检查点,是Flink故障恢复机制的核心

端到端(end-to-end)状态一致性
目前我们看到的一致性保证都是由流处理器实现的,也就是说都是在Flink流处理器内部保证的;而在真实应用中,流处理应用除了流处理器以外还包含了数据源(例如Kafka)和输出到持久化系统

端到端的一致性保证,意味着结果的正确性贯穿了整个流处理应用的始终;每一个组件都保证了它自己的一致性

整个端到端的一致性级别取决于所有组件中一致性最弱的组件

端到端 exactly-once

幂等写入
所谓幂等操作,是说一个操作,可以重复执行很多次,但只导致一次结果更改,也就是说,后面再重复执行就不起作用了。

(中间可能会存在不正确的情况,只能保证最后结果正确。比如5=>10=>15=>5=>10=>15,虽然最后是恢复到了15,但是中间有个恢复的过程,如果这个过程能够被读取,就会出问题。)

事务写入

预写日志(Write-Ahead-Log,WAL)
把结果数据先当成状态保存,然后在收到checkpoint完成的通知时,一次性写入sink系统
简单易于实现,由于数据提前在状态后端中做了缓存,所以无论什么sink系统,都能用这种方式一批搞定
DataStream API提供了一个模版类:GenericWriteAheadSink,来实现这种事务性sink

两阶段提交(Two-Phase-Commit,2PC)
对于每个checkpoint,sink任务会启动一个事务,并将接下来所有接收到的数据添加到事务里
然后将这些数据写入外部sink系统,但不提交它们——这时只是"预提交"
这种方式真正实现了exactly-once,它需要一个提供事务支持的外部sink系统。Flink提供了TwoPhaseCommitSinkFunction接口

不同Source和Sink的一致性保证

内部——利用checkpoint机制,把状态存盘,发生故障的时候可以恢复,保证内部的状态一致性
source——kafka consumer作为source,可以将偏移量保存下来,如果后续任务出现了故障,恢复的时候可以由连接器重制偏移量,重新消费数据,保证一致性
sink——kafka producer作为sink,采用两阶段提交sink,需要实现一个TwoPhaseCommitSinkFunction

Exactly-once 两阶段提交

JobManager 协调各个 TaskManager 进行 checkpoint 存储
checkpoint保存在 StateBackend中,默认StateBackend是内存级的,也可以改为文件级的进行持久化保存

当 checkpoint 启动时,JobManager 会将检查点分界线(barrier)注入数据流
barrier会在算子间传递下去

每个算子会对当前的状态做个快照,保存到状态后端
checkpoint 机制可以保证内部的状态一致性

每个内部的 transform 任务遇到 barrier 时,都会把状态存到 checkpoint 里

sink 任务首先把数据写入外部 kafka,这些数据都属于预提交的事务;遇到 barrier 时,把状态保存到状态后端,并开启新的预提交事务

(barrier之前的数据还是在之前的事务中没关闭事务,遇到barrier后的数据另外新开启一个事务)

当所有算子任务的快照完成,也就是这次的 checkpoint 完成时,JobManager 会向所有任务发通知,确认这次 checkpoint 完成
sink 任务收到确认通知,正式提交之前的事务,kafka 中未确认数据改为“已确认”

Exactly-once 两阶段提交步骤总结

【Flink 精选】阐述 Flink 的容错机制,剖析 Checkpoint 实现流程

参考技术A

Flink 容错机制主要是 状态的保存和恢复,涉及 state backends 状态后端、checkpoint 和 savepoint,还有 Job 和 Task 的错误恢复

Flink 状态后端是指 保存 Checkpoint 数据的容器 ,其分类有 MemoryStateBackend、FsStateBackend、RocksDBStateBackend ,状态的分类有 operator state 和 keyed state

Flink 状态保存和恢复主要依靠 Checkpoint 机制和 Savepoint 机制,两者的区别如下表所示。

快照的概念来源于相片,指照相馆的一种冲洗过程短的照片。在计算机领域, 快照是数据存储的某一时刻的状态记录 Flink Snapshot 快照是指作业状态的全局一致记录 。一个完整的快照是包括 source 算子的状态(例如,消费 kafka partition 的 offset)、状态算子的缓存数据和 sink 算子的状态(批量缓存数据、事务数据等)。

Checkpoint 检查点可以自动产生快照,用于Flink 故障恢复 。Checkpoint 具有分布式、异步、增量的特点。

Savepoint 保存点是用户手动触发的,保存全量的作业状态数据 。一般使用场景是作业的升级、作业的并发度缩放、迁移集群等。

Flink 是采用轻量级的分布式异步快照,其实现是采用栅栏 barrier 作为 checkpoint 的传递信号,与业务数据一样无差别地传递下去 ,目的是使得数据流被切分成微批,进行 checkpoint 保存为 snapshot。当 barrier 经过流图节点的时候,Flink 进行 checkpoint 保存状态数据。
如下图所示,checkpoint n 包含每个算子的状态,该状态是指checkpoint n 之前的全部事件,而不包含它之后的所有事件。

针对用户作业出现故障而导致结果丢失或者重复的问题,Flink 提供3种语义:
At-Least-Once 最少一次 :不会丢失数据,但可能会有重复结果。
Exactly-Once 精确一次 :checkpoint barrier 对齐机制可以保障精确一次。

FailureRateRestartStrategy :允许在指定时间间隔内的最大失败次数,同时可以设置重启延时时间。
FixedDelayRestartStrategy :允许指定的失败次数,同时可以设置重启延时时间。
NoRestartStrategy :不需要重启,即 Job 直接失败。
ThrowingRestartStrategy :不需要重启,直接抛异常。
Job Restart 策略可以通过 env 设置。

上述策略的父类接口是RestartStrategy,其关键是restart(重启操作)。

RestartAllStrategy :重启全部 task,默认策略。
RestartIndividualStrategy :恢复单个 task。如果该 task 没有source,可能导致数据丢失。
NoOpFailoverStrategy :不恢复 task。
上述策略的父类接口是FailoverStrategy,其关键是Factory的create(创建 strategy)、onTaskFailure(处理错误)。

如何产生可靠的全局一致性快照是分布式系统的难点,其传统方案是使用的全局时钟,但存在单点故障、数据不一致等可靠性问题 。为了解决该问题, Chandy-Lamport 算法采用 marker 的传播来代替全局时钟

① 进程 Pi 记录自己的进程状态,同时生产一个标识信息 marker(与正常 message 不同),通过 ouput channel 发送给系统里面的其他进程。
② 进程 Pi 开始记录所有 input channel 接收到的 message

进程 Pj 从 input channel Ckj 接收到 marker。如果 Pj 还没有记录自己的进程状态,则 Pj 记录自己的进程状态,向 output channel 发送 marker;否则 Pj 正在记录自己的进程状态(该 marker 之前的 message)。

所有的进程都收到 marker 信息并且记录下自己的状态和 channel 的状态(包含的 message)。

Flink 的分布式异步快照实现了Chandy Lamport 算法,其核心思想是 在 source 插入 barrier 代替 Chandy-Lamport 算法中的 marker,通过控制 barrier 的同步来实现 snapshot 的备份和 Exactly-Once 语义

Checkpoint Coordinator 向所有 source 节点 trigger Checkpoint。

source task向下游广播barrier。

当source task备份完自己的状态后,会将备份数据的地址(state handle)通知 Checkpoint Coordinator。

map和sink task收集齐上游source的barrier n,执行本地快照。下面例子是RocksDB增量Checkpoint 的流程:首先RocksDB会全量保存到磁盘上(红色大三角表示),然后Flink会从中选择没有上传的文件进行持久化备份(紫色小三角)。

map和sink task在完成Checkpoint 之后,将状态地址state handle返回通知 Coordinator。

当Checkpoint Coordinator收到全部task的state handle,就确定该Checkpoint已完成,并向持久化存储中备份一个Checkpoint Meta(元数据,包括该checkpoint状态数据的备份地址)。

以上是关于Flink基础系列28-Flink容错机制的主要内容,如果未能解决你的问题,请参考以下文章

Flink核心篇,四大基石容错机制广播反压序列化内存管理资源管理...

day05_Flink容错机制

带你认识Flink容错机制的两大方面:作业执行和守护进程

【Flink 精选】阐述 Flink 的容错机制,剖析 Checkpoint 实现流程

Apache Flink 实现原理:容错机制

Flink容错机制(checkpoint)