FlinkFlink 部分算子是 FinishSHED 不做checnpoint
Posted 九师兄
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了FlinkFlink 部分算子是 FinishSHED 不做checnpoint相关的知识,希望对你有一定的参考价值。
1.概述
转载:记一次flink不做checkpoint的问题 请参考原文,这里仅仅是学习一下。
问题现象: Flink UI界面查看checkpoint的metrics发现一直没有做checkpoint,仔细排查发现有部分subtask的状态是finished。
下图是测试环境复现问题
问题原因: 仔细排查代码后发现source是消费kafka的数据,配置的并行度大于kafka的partition数,导致有部分subtask空闲,然后状态变为finished。后来查看了checkpoint过程的源码得以佐证。
在CheckpointCoordinator类的triggerCheckpoint方法中有如下代码段
// check if all tasks that we need to trigger are running.
// if not, abort the checkpoint
Execution[] executions = new Execution[tasksToTrigger.length];
for (int i = 0; i < tasksToTrigger.length; i++)
Execution ee = tasksToTrigger[i].getCurrentExecutionAttempt();
if (ee == null)
LOG.info("Checkpoint triggering task of job is not being executed at the moment. Aborting checkpoint.",
tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
job);
throw new CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
else if (ee.getState() == ExecutionState.RUNNING)
executions[i] = ee;
else
LOG.info("Checkpoint triggering task of job is not in state but instead. Aborting checkpoint.",
tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
job,
ExecutionState.RUNNING,
ee.getState());
throw new CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
ee.getState() == ExecutionState.RUNNING判断execution的状态是否为running,否则不做checkpoint
问题结论: 在消费kafka的数据时,source的并发度不能超过kafka的partition数,可以小于partition,但是部分subtask就会消费多个partition的数据,导致吞吐达不到最大,理想状态是source并发度等于partition数。
注: 此问题出现时使用的是flink1.5
版本,高版本已经没有此问题了。
以上是关于FlinkFlink 部分算子是 FinishSHED 不做checnpoint的主要内容,如果未能解决你的问题,请参考以下文章
FlinkFlink 1.14 版本 新特性 Barrier 在流经算子做 checkpoint Barrier跳过 unaligned checkpoint
FlinkFLink Barrier 在流经算子 做 checkpoint 的时候,数据是停止的吗?
FlinkFlink 反压机制 导致checkpoint 失败
FLinkFlink Forward Asia Hackathon (2021) 回顾
FlinkFlink ChildFirstClassLoader loadClassWithoutExceptionHandling 空指针