关闭 Flink Checkpoint,引发 P3 故障
Posted zhisheng_blog
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了关闭 Flink Checkpoint,引发 P3 故障相关的知识,希望对你有一定的参考价值。
记录一个比较有意义的故障,没遇到的可以避坑,已经被坑过的只能握手🤝了。
背景
因阿里云提示机器有故障,会在第二天早高峰自动重启,按照之前运维操作,提前做好通知后,在集群非高峰期将机器踢出集群。踢出集群时该机器上运行的 TaskManager Pod 会挂掉,Flink 会在其他正常机器上申请新的 TaskManager 运行,期间会有任务的 failover。
操作后 10 来分钟看到公司大群有值班同事响应指标异常问题,加入到问题群聊中发现可能和剔除故障机器有关,于是加入排查,提供了当时受影响的任务信息,发现了异常指标对应的任务当时在消费 10 分钟之前的数据。
![](https://image.cha138.com/20230206/84eea107cf254970949fb028d2fd0bb8.jpg)
![](https://image.cha138.com/20230206/6b834030e2de4bd88a1dd00c04b6c7d6.jpg)
而这个消费延迟时长对于该作业的业务场景完全不能接受(延迟 10s 就会造成很大问题),最终延迟导致业务指标下跌,产生资损,故障最终定级 P3。
解决办法:业务方停了几个同步数据到 ES 索引的任务,然后重建了主链索引任务(任务同时加大并发),下游切换到新索引即恢复。
故障根因分析过程
事后找业务方线下讨论了一下,初步分析是 ES 压力大,导致 Flink 任务有反压,从而引起任务 Source 端消费 Kafka 数据导致的延迟。
![](https://image.cha138.com/20230206/1c8a224ad1b24975b344d1f36247f6f0.jpg)
![](https://image.cha138.com/20230206/eef40d389d1c4be3aa9060ddb52bb862.jpg)
本以为结论就是剔除机器后,受影响任务比较多,任务 failover 后会从上一次 Checkpoint 保存的 Kafka offset 开始消费,这时候写到 ES 的数据量会比正常多 2min 左右的数据(Checkpoint 集群默认设置的 2min 执行一次),从而导致对 ES 有点压力进而影响到 Flink 任务出现反压情况。但是晚上业务方给我提供了一个另外受影响任务(这里称备链任务,主链任务就是那个消费 10 分钟之前数据的任务)的作业监控链接,也就是通过这个任务的监控信息才真正发现问题的根因。
![](https://image.cha138.com/20230206/3fdc29d2bd944355bec771b86f790095.jpg)
当时发现这个备链任务在故障后有大部分 TaskManager 的 task 是从 22 个小时之前的数据开始消费的,如下图监控所示:
![](https://image.cha138.com/20230206/4a792f9358484e44b7cf534329d97ded.jpg)
又有少部分 TaskManager 的 task 是正常消费的,如下图所示:
![](https://image.cha138.com/20230206/b26eefd81bd34e20a9ce07bc2139c528.jpg)
在 failover 之后该备链任务的消费速度 TPS 从正常的两千增长到 12W/s
![](https://image.cha138.com/20230206/50f22ed72e354c618ff441d36e096d18.jpg)
主链任务 failover 后所有的 task 消费比较正常(前文已截图),对比几个监控来看该备链任务在 failover 后明显消费有异常,即消费 22 小时之前的数据,同时看到任务启动运行时长就是 22 小时左右,相当于从任务开第一次始启动的时候指定的时间点开始消费了。
![](https://image.cha138.com/20230206/aa29bcdc37f24fa5b74fdf7352ed4cae.jpg)
既然主链和备链任务 failover 后的消费策略都不一样,所以我当时第一反应觉得是不是备链任务长期 Checkpoint 失败,然后 failover 后从早期完成的 Checkpoint 中保存的 offset 开始消费,从而导致的消费 22 小时之前的数据,但是咨询了业务方发现任务关闭了 Checkpoint。心想那只有可能是没打开 Kafka 的 auto.commit.offset 或者打开了但是自动提交一直失败了,我去看了 Kafka topic groupid 的 committed offset 监控发现正常,一直有在正常的提交 offset,所以也否定了刚才的猜想。另外就是觉得两个任务的消费策略这么大,是不是代码不一致或者配置不一致导致的,因为一开始问了业务方代码是否一致,回复是一致的,这里我也被坑了一下,后面在平台发现两个任务的代码 Git commitId 都不一样,那也就是意味着代码不一致。
![](https://image.cha138.com/20230206/ef7b72cad6574f7791b322564ef2ee35.jpg)
![](https://image.cha138.com/20230206/0541de0e009b4b8e82496dffd43c5c92.jpg)
接着就开始 review 业务方的项目代码,发现备链路任务关闭了 Checkpoint,而主链任务是开启 Checkpoint 的,同时代码消费方式是指定了 Offset 开始消费(根据时间配置查询 topic 的 offset,然后从指定的 offset 开始消费,这种消费方式其实和指定 timestamp 开始消费是一致的)。
![](https://image.cha138.com/20230206/4f5849254b8741f78966a58fa8595492.jpg)
然后我就翻了下 KafkaSource Consumer 源码,如下图所示:
![](https://image.cha138.com/20230206/8e014ad37a3f40a2996fa337228d8a64.jpg)
Open 方法主要做了状态初始化相关逻辑,可以发现如果关闭了 Checkpoint,那么肯定不会有恢复的状态,也就是走到后面的逻辑,按照用户指定的 offset 开始消费。在踢出故障机器后,会申请新的 TaskManager 做状态初始化其实就是会走到上面的逻辑,也就导致 failover 后消费了很早之前的数据。排查到这,几乎根因已经出来了,接下来就是验证过程。
根因验证
测试代码:
@Slf4j
public class CheckpointTest
public static void main(String[] args) throws Exception
ParameterTool parameters = ParameterTool.fromArgs(args);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(parameters);
env.setParallelism(parameters.getInt("envParallelism", 1));
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
100,
Time.of(10, TimeUnit.SECONDS)
));
//手动关闭 checkpoint
env.getCheckpointConfig().disableCheckpointing();
FlinkKafkaConsumer<String> flinkKafkaConsumer = new FlinkKafkaConsumer<>(parameters.get("sourceTopic", "yarn_flink_log"), new SimpleStringSchema(), buildKafkaProps(parameters));
if (parameters.get("timestamp") != null)
log.info("Start from timestamp " + parameters.getLong("timestamp"));
// 手动指定 timestamp 开始消费,底层也是会根据 timestamp 去找这个 timestamp 下对应的 offset 数据
flinkKafkaConsumer.setStartFromTimestamp(parameters.getLong("timestamp"));
env.addSource(flinkKafkaConsumer)
.setParallelism(parameters.getInt("sourceParallelism", 1))
.print();
env.execute("test checkpoint not enable");
public static Properties buildKafkaProps(ParameterTool parameterTool)
Properties props = parameterTool.getProperties();
props.put("bootstrap.servers", parameterTool.get("sourceBootstrapServers", "localhost:9092"));
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put("group.id", parameterTool.get("sourceGroupId", String.valueOf(UUID.randomUUID())));
props.put("flink.partition-discovery.interval-millis", "10000");
props.put("auto.offset.reset", parameterTool.get("sourceAutoOffsetReset", "latest"));
return props;
当时指定的 timestamp 是 1666819431000 (对应 2022-10-27 05:23:51),然后在 2022-10-28 15:31 时手动 Kill 掉一个 Taskmanager pod,任务发生 failover,新申请的 pod 开始消费 1666819431000 (对应 2022-10-27 05:23:51)时候的数据
![](https://image.cha138.com/20230206/ea07a55963814a2c8f6e85849863c2b9.jpg)
新申请 TM 启动日志:
2022-10-28 15:31:05,254 INFO org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase [] - Consumer subtask 0 will start reading the following 6 partitions from timestamp 1666819431000: [KafkaTopicPartitiontopic='yxxx_log', partition=34, KafkaTopicPartitiontopic='yxxx_log', partition=4, KafkaTopicPartitiontopic='yxxx_log', partition=54, KafkaTopicPartitiontopic='yxxx_log', partition=24, KafkaTopicPartitiontopic='yxxx_log', partition=44, KafkaTopicPartitiontopic='yxxx_log', partition=14]
2022-10-28 15:31:05,267 INFO org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase [] - Consumer subtask 0 creating fetcher with offsets KafkaTopicPartitiontopic='yxxx_log', partition=34=(782507410,-1), KafkaTopicPartitiontopic='yxxx_log', partition=4=(87934879,-1), KafkaTopicPartitiontopic='yxxx_log', partition=54=(1259521975,-1), KafkaTopicPartitiontopic='yxxx_log', partition=24=(4335766087,-1), KafkaTopicPartitiontopic='yxxx_log', partition=44=(4210596124,-1), KafkaTopicPartitiontopic='yxxx_log', partition=14=(350718902,-1).
可以从日志发现是继续从 1666819431000 开始
结论
关闭 Checkpoint + 指定 timestamp 或者 Offset 消费 + 故障 failover 时会出现重复读数据问题
思考
1、核心任务一定要配置好消费延迟告警,及时发现和定位问题能减少故障损失
2、不要轻易漏掉某些指标,否则可能会忽略掉根因
3、关闭掉集群默认开启的配置需要谨慎,多充分测试
![](https://image.cha138.com/20230206/ad4526fa231f48c9b2e3a385b1252e2b.jpg)
以上是关于关闭 Flink Checkpoint,引发 P3 故障的主要内容,如果未能解决你的问题,请参考以下文章
Flink从BucketSink看checkpoint与故障恢复