【Flink 精选】如何排查 Checkpoint 异常问题?
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了【Flink 精选】如何排查 Checkpoint 异常问题?相关的知识,希望对你有一定的参考价值。
参考技术ACheckpoint 检查点,Flink 定期把 state 缓存数据持久化保存下来的过程 。它的目的是容错和 exactly-once 语义功能。
分布式系统实现一个全局状态保留的功能。
① 传统方案使用一个统一时钟,通过 master 节点广播到每个 slaves 节点。当 slaves 接收到后,记录其状态 。缺点:单点故障、数据不一致(延迟/失败)、系统不稳定
② Flink 采用栅栏 Barrier 作为 Checkpoint 的传递信号,与业务数据一样,无差别的传递下去 。
每一个 Flink 作业都会有一个 JobManager ,JobManager 里面的 checkpoint coordinator 管理整个作业的 checkpoint 过程。用户通过 env 设置 checkpoint 的时间间隔,使得 checkpoint coordinator 定时将 checkpoint 的 barrier 发送给每个 source subtask。
当 source 算子实例收到一个 barrier 时,它会暂停自身的数据处理,然后将自己的当前 缓存数据 state 保存为快照 snapshot,并且持久化到指定的存储,最后算子实例向 checkpoint coordinator 异步发送一个确认信号 ack,同时向所有下游算子广播该 barrier 和恢复自身的数据处理。
以此类推,每个算子不断制作 snapshot 并向下游广播 barrier,直到 barrier 传递到 sink 算子实例,此时确定全局快照完成。
Flink Web UI 有 Checkpoint 监控信息,包括统计信息和每个Checkpoint的详情。如下图所示,红框里面可以看到一共触发了 569K 次 Checkpoint,然后全部都成功完成,没有 fail 的。
如下图所示,点击某次 Checkpoint “+”,可知该Checkpoint 的详情。
① Acknowledged 表示有多少个 subtask 对这个 Checkpoint 进行了 ack,从图中可知,共有3个 operator 分为2个 subtask,这2个 subtask 都完成 ack。
② Latest Acknowledgement 表示所有 subtask 的最后 ack 的时间;
③ End to End Duration 表示所有 subtask 中完成 snapshot 的最长时间;
④ State Size 表示当前 Checkpoint 的 state 大小(如果是增量 checkpoint,则表示增量大小);
⑤ Buffered During Alignment 表示在 barrier 对齐阶段累计多少数据(如果这个数据过大,则间接表示对齐比较慢);
如下图所示,Flink Web UI 的 Checkpoint 界面显示 Checkpoint 10432 失败。点击 Checkpoint 10423 的详情“+”,可知 Acknowledged、Latest Acknowledgement等信息。
查看 JobManager 的日志 jobmanager.log,其中关键日志,如下
解析: 10423 是 checkpointID, 0b60f08bf8984085b59f8d9bc74ce2e1 是 task execution id 即 subtask id, 85d268e6fbc19411185f7e4868a44178 是 job id。
从上述的 jobmanager.log 日志中,可知 subtask id 和 job id,可以确定 taskmanager 和 slot。
从上面的日志,可知 subtask 被调度到 节点 hostnameABCDE 的 container_e24_1566836790522_8088_04_013155_1 slot ,接着到 container container_e24_1566836790522_8088_04_013155 的 taskmanager.log 中查找 Checkpoint 失败的具体原因。
如果较小的 Checkpoint 没有对齐的情况,Flink 收到了更大的 Checkpoint,则会把较小的 Checkpoint 给取消,其关键日志如下。
该日志表示当前 Checkpoint 19 还在对齐阶段,同时收到了 Checkpoint 20 的 barrier,接着通知到下游的 task checkpoint 19 被取消了,同时也会通知 JM 当前 Checkpoint 被 decline 掉了。
当下游 task 收到被 cancel barrier 的时候,打印如下的关键日志,表示当前 task 接收到上游发送过来的 barrier cancel 消息,从而取消了对应的 Checkpoint。
如果 Checkpoint 做的非常慢,超过了 timeout 还没有完成,则整个 Checkpoint 也会失败。例如,如果 Checkpoint 21 由于超时而失败是,jobmanager.log 的关键日志如下。
接着打开 debug 级别的日志, taskmananger.log 的 snapshot 分为三个阶段,开始 snapshot 前,同步阶段,异步阶段:
Checkpoint 慢的场景,例如 Checkpoint interval 1 分钟,超时 10 分钟,Checkpoint 经常需要做 9 分钟,而且实际 state size 比预期的大很多。
简单介绍 Checkpoint Barrier 对齐机制: 算子 Operator 从输入流接收到 barrier n 后,它就不能处理来自该流的任何数据记录,直到它从其他所有输入接收到 barrier n 。如下图所示,Operator 从数字流中接收到 barrier n 后,接着数字流的数据不会被处理而是放入输入缓冲区。直到字母流的 barrier n 达到 Operator 后,Operator 向下游发送 barrier n 和 缓冲区的数据,同时进行自身的 snapshot。
由于 barrier 对齐机制,算子需要接收到上游全部 barrier n 后,才会进行 snapshot。如果作业存在反压或者数据倾斜,则会导致全部的 channel 或者某些 channel 的 barrier 发送慢,从而整体影响 Checkpoint 的时间。如下图所示,通过Flink Web UI 监控 subtask 数据量 和反压 BackPressure。
介绍Checkpoint Barrier 对齐机制,算子 Operator 收齐上游的 barrier n 才能触发 snapshot。例如,StateBackend 是 RocksDB,snapshot 开始的时候保存数据到 RocksDB,然后 RocksDB 异步持久化到 FS。如果 barrier n 一直对不齐的话,就不会开始做 snapshot。
Checkpoint 有两种模式:全量 Checkpoint 和 增量 Checkpoint 。全量 Checkpoint 会把当前的 state 全部备份一次到持久化存储,而增量 Checkpoint,则只备份上一次 Checkpoint 中不存在的 state,因此增量 Checkpoint 每次上传的内容会相对更好,在速度上会有更大的优势。
如果通过日志发现同步阶段比较慢,对于非 RocksDBBackend,可以考虑开启异步 snapshot。如果开启了异步 snapshot 还是慢,需要使用 AsyncProfile 查看整个JVM。
对于 RocksDBBackend,使用 iostate 查看磁盘的压力,同时查看 TaskMananger 的 RocksDB log日志,查看其中 snapshot 时间总开销。
异步阶段,TaskManager 主要将 state 备份到持久化存储 HDFS。对于非 RocksDBBackend,主要瓶颈来自于 网络 ,可以考虑观察网络的 metric,或者使用 iftop 观察对应机器上的网络流量情况。
对于 RocksDB,则需要从本地读取文件,写入到远程的持久化存储上 HDFS,所以不仅需要考虑 网络的瓶颈,还需要考虑本地磁盘的性能 。
该场景出现的概率比较小,source 做 snapshot 并往下游发送 barrier 的时候,需要抢锁。如果一直抢不到锁的话,则可能导致 Checkpoint 一直得不到机会进行。如果在 Source 所在的 taskmanager.log 中找不到开始做 Checkpoint 的 log,则可以考虑是否属于这种情况,可以通过 jstack 进行进一步确认锁的持有情况。
以上是关于【Flink 精选】如何排查 Checkpoint 异常问题?的主要内容,如果未能解决你的问题,请参考以下文章
【Flink 精选】阐述 Flink 的容错机制,剖析 Checkpoint 实现流程