FlinkFlink Committing offsets to Kafka takes longer than the checkpoint interval
Posted 九师兄
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了FlinkFlink Committing offsets to Kafka takes longer than the checkpoint interval相关的知识,希望对你有一定的参考价值。
文章目录
1.概述
一个flink程序在运行之后,重检查点恢复的时候报错
这个错误看起来 是增量的检查点,然后被删除了
如果你彻底删掉了文件, 那就没得解决,如果删除的文件,垃圾箱还有,可以恢复一下 ,否则只能重新开始了。
但是后面一直waring,信息如下
Committing offsets to Kafka takes longer than the checkpoint interval. Skipping commit of previous offsets because newer complete checkpoint offsets are available. This does not compromise Flink's checkpoint integrity.
详情如下
根据报错找到报错的位置
/**
* Tells this thread to commit a set of offsets. This method does not block, the committing
* operation will happen asynchronously.
*
* <p>Only one commit operation may be pending at any time. If the committing takes longer than
* the frequency with which this method is called, then some commits may be skipped due to being
* superseded by newer ones.
*
* 告诉这个线程提交一组偏移量。此方法不阻塞,提交操作将异步发生。
*
* 任何时候只能有一个提交操作挂起。如果提交所花费的时间比调用此方法的频率要长,
* 那么一些提交可能会由于被新的提交所取代而被跳过。
*
* @param offsetsToCommit The offsets to commit
* @param commitCallback callback when Kafka commit completes
*/
void setOffsetsToCommit(
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit,
@Nonnull KafkaCommitCallback commitCallback)
// record the work to be committed by the main consumer thread and make sure the consumer
// notices that
// 记录主消费者线程提交的工作,并确保消费者注意到这一点
// 向Kafka提交偏移量的时间比检查点间隔更长。跳过先前的偏移量的提交,因为新的完整检查点偏移量可用。
// 这不会损害Flink的检查点完整性。
if (nextOffsetsToCommit.getAndSet(Tuple2.of(offsetsToCommit, commitCallback)) != null)
log.warn(
"Committing offsets to Kafka takes longer than the checkpoint interval. "
+ "Skipping commit of previous offsets because newer complete checkpoint offsets are available. "
+ "This does not compromise Flink's checkpoint integrity.");
// if the consumer is blocked in a poll() or handover operation, wake it up to commit soon
handover.wakeupProducer();
synchronized (consumerReassignmentLock)
if (consumer != null)
consumer.wakeup();
else
// the consumer is currently isolated for partition reassignment;
// set this flag so that the wakeup state is restored once the reassignment is
// complete
hasBufferedWakeup = true;
可以看到这里,报错warn是因为指有新的checkpoint offset提交成功了,前次没提上去offset放弃了啊
然后看看这个是在哪调用的。根据查找,可以看到,最顶层调用是检查点完成的时候,调用commitInternalOffsetsToKafka方法 将offsets提交至kafka,完成Pre-Commit
ublic final void notifyCheckpointComplete(long checkpointId) throws Exception
if (!running)
LOG.debug("notifyCheckpointComplete() called on closed source");
return;
final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher;
if (fetcher == null)
LOG.debug("notifyCheckpointComplete() called on uninitialized source");
return;
if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS)
// only one commit operation must be in progress
if (LOG.isDebugEnabled())
LOG.debug(
"Consumer subtask committing offsets to Kafka/ZooKeeper for checkpoint .",
getRuntimeContext().getIndexOfThisSubtask(),
checkpointId);
try
// 查找checkpointId在pendingOffsetsToCommit中的位置
final int posInMap = pendingOffsetsToCommit.indexOf(checkpointId);
if (posInMap == -1)
LOG.warn(
"Consumer subtask received confirmation for unknown checkpoint id ",
getRuntimeContext().getIndexOfThisSubtask(),
checkpointId);
return;
// 根据checkpointId查找pendingOffsetsToCommit中的partition-offset,也就是待提交的offset,记录到offsets中,并从pendingOffsetsToCommit中删除
@SuppressWarnings("unchecked")
Map<KafkaTopicPartition, Long> offsets =
(Map<KafkaTopicPartition, Long>) pendingOffsetsToCommit.remove(posInMap);
// remove older checkpoints in map
for (int i = 0; i < posInMap; i++)
pendingOffsetsToCommit.remove(0);
if (offsets == null || offsets.size() == 0)
LOG.debug(
"Consumer subtask has empty checkpoint state.",
getRuntimeContext().getIndexOfThisSubtask());
return;
// 将offsets提交至kafka,完成Pre-Commit
fetcher.commitInternalOffsetsToKafka(offsets, offsetCommitCallback);
catch (Exception e)
if (running)
throw e;
// else ignore exception if we are no longer running
调用内部方法 commitInternalOffsetsToKafka
/**
* Commits the given partition offsets to the Kafka brokers (or to ZooKeeper for older Kafka
* versions). This method is only ever called when the offset commit mode of the consumer is
* @link OffsetCommitMode#ON_CHECKPOINTS.
*
* <p>The given offsets are the internal checkpointed offsets, representing the last processed
* record of each partition. Version-specific implementations of this method need to hold the
* contract that the given offsets must be incremented by 1 before committing them, so that
* committed offsets to Kafka represent "the next record to process".
*
* 将给定的分区偏移量提交给Kafka broker(或者更老的Kafka版本提交给ZooKeeper)这个方法只在
* 消费者的偏移提交模式为@link OffsetCommitMode#ON_CHECKPOINTS时被调用。
*
* 给定的偏移量是内部检查点偏移量,表示每个分区最后处理的记录。这个方法的版本特定的实现需要保持
* 一个约定,即给定的偏移量在提交之前必须加1,这样提交到Kafka的偏移量就表示“下一个要处理的记录”。
*
* @param offsets The offsets to commit to Kafka (implementations must increment offsets by 1
* before committing).
* @param commitCallback The callback that the user should trigger when a commit request
* completes or fails.
* @throws Exception This method forwards exceptions.
*/
public final void commitInternalOffsetsToKafka(
Map<KafkaTopicPartition, Long> offsets, @Nonnull KafkaCommitCallback commitCallback)
throws Exception
// Ignore sentinels. They might appear here if snapshot has started before actual offsets
// values
// replaced sentinels
// 忽略sentinels。如果快照在实际偏移值取代sentinels值之前已经开始,它们可能会出现在这里
doCommitInternalOffsetsToKafka(filterOutSentinels(offsets), commitCallback);
最终调用 doCommitInternalOffsetsToKafka 真正提交
@Override
protected void doCommitInternalOffsetsToKafka(
Map<KafkaTopicPartition, Long> offsets, @Nonnull KafkaCommitCallback commitCallback)
throws Exception
@SuppressWarnings("unchecked")
List<KafkaTopicPartitionState<T, TopicPartition>> partitions = subscribedPartitionStates();
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>(partitions.size());
for (KafkaTopicPartitionState<T, TopicPartition> partition : partitions)
Long lastProcessedOffset = offsets.get(partition.getKafkaTopicPartition());
if (lastProcessedOffset != null)
checkState(lastProcessedOffset >= 0, "Illegal offset value to commit");
// committed offsets through the KafkaConsumer need to be 1 more than the last
// processed offset.
// This does not affect Flink's checkpoints/saved state.
long offsetToCommit = lastProcessedOffset + 1;
offsetsToCommit.put(
partition.getKafkaPartitionHandle(), new OffsetAndMetadata(offsetToCommit));
partition.setCommittedOffset(offsetToCommit);
// record the work to be committed by the main consumer thread and make sure the consumer
// notices that
// 记录主消费者线程提交的工作,并确保消费者注意到这一点
consumerThread.setOffsetsToCommit(offsetsToCommit, commitCallback);
这是做检查点 完成的时候 提交kafka offset 到 broker 。
看起来没有什么问题,但是我的程序一直在出这个警告,也不运行。
怀疑是检查点设置的太小了
env.enableCheckpointing(1000L)
我改成5分钟一次,就好了。
以上是关于FlinkFlink Committing offsets to Kafka takes longer than the checkpoint interval的主要内容,如果未能解决你的问题,请参考以下文章
Native memory allocation (mmap) failed to map 142606336 bytes for committing reserved memory.
Eclipse git commit错误;Committing changes has encountered a problem An Internal error occured
Git异常 #Git提交代码时提示:Committing is not possible because you have unmerged files.
Git异常 #Git提交代码时提示:Committing is not possible because you have unmerged files.
Git异常 #Git提交代码时提示:Committing is not possible because you have unmerged files.