Flink从BucketSink看checkpoint与故障恢复

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink从BucketSink看checkpoint与故障恢复相关的知识,希望对你有一定的参考价值。

参考技术A 看了 BucketSink 的相关源码。着重看了它的checkpoint以及故障恢复机制。
把大概的理解梳理如下:
BucketSink 大体的工作流程:
1.新建一个文件,不断的写入文件中,后缀命名为 .in-progress
2.判断文件写入完毕,关闭该文件时,后缀名命名为 .pending
3.checkpoint触发时,将上次ck到这次ck间的所有 .pending 文件变为 finish 状态
BucketSink 实现了 CheckpointedFunction 接口
有两个方法
void snapshotState(FunctionSnapshotContext context) throws Exception;

void initializeState(FunctionInitializationContext context) throws Exception;�其中:
initializeState 方法在每次新建 BucketSink 或者故障恢复时 会调用。
snapshotState 在每次触发 ck 时会被调用。
下面简单分析下这两个方法的逻辑:
initializeState 方法主要执行一些初始化操作,其中我认为关键的在于

restoredBucketStates = stateStore.getSerializableListState("bucket-states");
该方法获取一个叫做 bucket-states 的状态对象,从名称也可知,该对象用于重启。正常情况下,该对象无内容下面的for语句不会执行。但是若有故障重启的情况,则会从上次的ck中读取出内容,也就是上次ck的状态信息,然后执行回滚操作保证数据的一致性。这一点最后再做介绍。

snapshotState 方法用于触发 ck 操作。
这个方法做了如下几件事
1.获取当前正在写的 .pending 文件的大小,以便若下次 ck 前发生故障,可以获知本次ck时,该文件的大小,以便删除本次ck后到故障发生时写入的数据,或者显示该文件的有效数据大小。
2.将所有 .pending 状态的文件存储到list中,稍后ck结束后,方便修改其状态为 finish
3.将当前状态存入 restoredBucketStates 对象,以便若下次 ck 前发生故障,可以从这个状态处进行恢复。

同时,BucketSink也实现了 CheckpointListener 接口
void notifyCheckpointComplete(long checkpointId) throws Exception;
该方法会在 ck 完成后调用。

该方法,将 .pending 文件的状态转为 final 状态
并且移除writer已经处于close状态的bucket。

最后详细说一下故障恢复。
当程序因故障自动恢复时,initializeState 方法的 restoredBucketStates 就会从上次 ck 中获取到上次ck时的状态。进而进行恢复。

首先,将 .pending 状态的文件名列表清空即可,因为将 .pending 状态转为 finish 状态,可以在 notifyCheckpointComplete 方法中完成。故障恢复时,该方法对 .pending 的文件的做法是不做处理,等待故障恢复之后,第一次ck触发时,便会自动的将 .pending 的文件变为 finish 状态。
而之所以不处理 .pending 状态文件,是因为 .pending 状态文件说明该文件已经写入完毕,就差ck成功后修改文件状态(也就是文件名)而已,本质上,该文件已经不再写入数据,没有数据的变化。
接下来 handlePendingInProgressFile 就是处理 .in-progress 状态的文件。
我们设想一下,故障重启是指在上次成功的ck之后,下次ck之前,发生了故障,然后应用自动重启,使用的是上次成功的ck的状态信息。
这样的话,上次 ck 时状态为 .in-progress 的文件,可能在故障发生时,已经处于 .pending 状态,也就是写完的状态,也可能仍然处于 .in-progress 状态。
flink的做法是,不管处于什么状态 首先全部标注为 finish 状态。然后根据上次ck时状态中存储的文件的大小进行截断,这样,该文件就能回滚到上次ck成功时的状态。若 Hadoop 版本不支持截断操作,则新建一个后缀为 .valid-length 的文件,内容为文件的大小,单位 byte。
然后flink就可以从上次ck处重新拉取数据源,继续处理,写入sink。
最后,调用 handlePendingFilesForPreviousCheckpoints 将上次ck成功后,若故障发生的很快,没来得及调用 CheckpointListener 的 notifyCheckpointComplete 方法,则此处将文件状态置为 finish 。

BucketSink 是一个控制类,具体的写入操作可以自己实现 org.apache.flink.streaming.connectors.fs.Writer 接口。
其中 snappy 等压缩文件的追加,可以使用
Fs.append 的方式追加内容到同一文件中

Flink 如何定位背压来源

Flink 版本:1.13.0

在过去的几年里,背压的问题从不同的角度得到了解决。然而,在判断和分析背压来源时,最近的 Flink 版本发生了很大的变化(尤其是在 Flink 1.13 中新增了监控指标和 Web UI)。这篇文章将一起看一下其中的一些变化,并详细介绍如何追踪背压的来源,但首先……

1. 什么是背压?

Flink 如何处理背压 这篇文章虽然比较旧,但比较准确的解释了背压。如果您不了解背压这个概念,我强烈建议您阅读一下。如果想更深入、更底层地了解该话题以及 Flink 的网络堆栈是如何工作的,可以查阅Flink 网络流控和反压剖析详解

从高层次上理解,如果作业图中的某些算子无法以接收记录相同的速度处理记录,就会发生背压。运行这个慢算子的子任务的输入缓冲区就会被填满。一旦输入缓冲区被填满,背压就会传播到上游子任务的输出缓冲区。一旦上游子任务的输出缓冲区被填满,上游子任务就被迫降低处理速度,来匹配慢算子的处理速度,最终导

以上是关于Flink从BucketSink看checkpoint与故障恢复的主要内容,如果未能解决你的问题,请参考以下文章

从 Flink Forward Asia 2021,看Flink未来开启新篇章

从 Flink Forward Asia 2021,看Flink未来开启新篇章

从 Flink Forward Asia 2021,看Flink未来开启新篇章

《从0到1学习Flink》—— Flink 项目如何运行?

大数据时代,且看新一代计算引擎Spark和Flink成王败寇

Flink是如何从kafka中拉取数据的