Kafka Consumer Offset解析

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kafka Consumer Offset解析相关的知识,希望对你有一定的参考价值。

参考技术A Kafka __consumer_offsets是一个特殊的存储元数据的Topic
数据格式可以想象成一个 KV 格式的消息,key 就是一个三元组:group.id+topic+分区号,而 value 就是 offset 的值。

查看方式:使用kafka自带的读取类
./bin/kafka-console-consumer.sh --topic __consumer_offsets --partition 01 --bootstrap-server xxx:9092 --formatter "kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter" --from-beginning --max-messages 30

一般情况下, 使用 OffsetsMessageFormatter 打印的格式可以概括为:
"[%s,%s,%d]::[OffsetMetadata[%d,%s],CommitTime %d,ExpirationTime %d]".format(group, topic, partition, offset, metadata, commitTimestamp, expireTimestamp)

数据内容:
[flink-payment-alert_query_time_1576066085229,payment-result-count,4]::NULL
[flink-payment-alert_query_time_1576066085229,payment-result-count,3]::NULL
[flink-payment-alert_query_time_1576066085229,payment-result-count,9]::NULL

另外一种是
[work_default_yw.int.spring.cxyw.blackgold.kafka.orderdomain.core.sub,work_default_yw.int.spring.cxyw.blackgold.kafka.orderdomain.topic,0]::OffsetAndMetadata(offset=19, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1636939024066, expireTimestamp=None)
[work_default_yw.int.spring.cxyw.blackgold.kafka.orderdomain.core.sub,work_default_yw.int.spring.cxyw.blackgold.kafka.orderdomain.topic,0]::OffsetAndMetadata(offset=19, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1636939028621, expireTimestamp=None)
[work_default_yw.int.spring.cxyw.blackgold.kafka.orderdomain.core.sub,work_default_yw.int.spring.cxyw.blackgold.kafka.orderdomain.topic,0]::OffsetAndMetadata(offset=19, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1636939033680, expireTimestamp=None)

还有一种是
[ProcessEngineBusinessProcess,CasBusinessTopic,1]::[OffsetMetadata[99649027,NO_METADATA],CommitTime 1636930671854,ExpirationTime 1637017071854]
[ProcessEngineBusinessProcess,CasBusinessTopic,0]::[OffsetMetadata[99650360,NO_METADATA],CommitTime 1636930671854,ExpirationTime 1637017071854]
[ProcessEngineBusinessProcess,CasBusinessTopic,3]::[OffsetMetadata[99640798,NO_METADATA],CommitTime 1636930672471,ExpirationTime 1637017072471]

分别解释一下:

在 Kafka 中有一个名为“delete-expired-group-metadata”的定时任务来负责清理过期的消费位移,这个定时任务的执行周期由参数 offsets.retention.check.interval.ms 控制,默认值为600000,即10分钟。这和普通的topic的不太一样

还有 metadata,一般情况下它的值要么为 null 要么为空字符串,OffsetsMessageFormatter 会把它展示为 NO_METADATA,否则就按实际值进行展示。

看一下源码里这些类的结构
case class OffsetAndMetadata(offsetMetadata: OffsetMetadata,
commitTimestamp: Long = org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_TIMESTAMP,
expireTimestamp: Long = org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_TIMESTAMP)

case class OffsetMetadata(offset: Long, metadata: String = OffsetMetadata.NoMetadata)
override def toString = "OffsetMetadata[%d,%s]"
.format(offset,
if (metadata != null && metadata.length > 0) metadata else "NO_METADATA")


@Deprecated
public static final long DEFAULT_TIMESTAMP = -1L; // for V0, V1

另外0.11.0之后对应的数据格式版本是V2,这个版本的消息相比于v0和v1的版本而言改动很大,同时还参考了Protocol Buffer而引入了变长整型(Varints)和ZigZag编码。

另外:
offset为什么会有墓碑消息?
因为offset本身也会过期清理.受offsets.retention.minutes 这个配置的影响
看下官网介绍
After a consumer group loses all its consumers (i.e. becomes empty) its offsets will be kept for this retention period before getting discarded. For standalone consumers (using manual assignment), offsets will be expired after the time of last commit plus this retention period.
当group里的consumer全部下线后过offsets.retention.minutes 时间后offset就会被删除
val OffsetsRetentionMinutes: Int = 7 * 24 * 60 // 默认7天
默认2.0之前是1天,2.0及以后是7天 这个官方真是..要么就改为2天,结果直接改为7天,改动不可谓不大,而且active的group不会过期

附: https://cwiki.apache.org/confluence/display/KAFKA/KIP-186%3A+Increase+offsets+retention+default+to+7+days

另外active的group无法修改consumer offset?
Usually we do not allow committed offset changes while a group is active because we do not have a mechanism to notify the group of the change.
原因是无法通知到组成员consumer offset的变更

Kafka Consumer配置——auto.offset.reset如何控制消息消费

【中文标题】Kafka Consumer配置——auto.offset.reset如何控制消息消费【英文标题】:Kafka Consumer configuration - How does auto.offset.reset controls the message consumption 【发布时间】:2020-03-08 18:41:19 【问题描述】:

我想了解一下,ConsumerConfig.auto.offset.reset = latest 会如何影响消息消费。

例如,我有一个消费者,最初在 t1 时间发送 100 条消息,然后我的消费者在 t1+30 秒启动并运行,然后我的消费者会消费 t1+30 秒后发布的消息还是会消费消息从 t1 开始发布?

【问题讨论】:

你的问题不清楚 【参考方案1】:

视情况而定。

auto.offset.reset 仅适用于没有为消费者组存储偏移量的情况。

适用于以下情况:

消费者组第一次消费 如果消费者没有提交任何偏移量,则下次启动时 如果消费者组已过期(现代代理默认为 7 天) 如果存储的偏移量指向的消息由于消息保留策略而被删除(尝试读取已清除的消息会触发规则的应用)

如果消费者提交了偏移量;它将在下次启动时从上次提交的偏移量开始。

【讨论】:

感谢您提供详细信息。我有一个关于第一点的问题,如果消费者第一次起床并准备消费消息,但发布者/生产者在消费者准备好之前已经产生了一些消息,那么它不会消费之前产生的那些消息消费者准备好了吗? 如果设置为latest,在我上面描述的任何条件下,它不会读取任何现有记录。 感谢您的澄清。 将此配置设置为最新,我们应该始终首先部署消费者并确保其准备就绪,然后开始生成消息。如果我不想依赖于部署应用程序的顺序,我应该选择“最早”而不是“最新”吗?如果是,如果我们重新启动消费者,这可能会导致多次处理相同的消息。那么,您对解决这两个问题有何建议?请提出建议。 否;它不会;只要消费者提交它已读取的记录的偏移量。如果您使用的是 spring-kafka,容器将为您完成。如果您直接使用消费者,则必须根据您的要求调用其中一种提交方法。消费者下次启动时,将从上次提交的偏移量开始消费。

以上是关于Kafka Consumer Offset解析的主要内容,如果未能解决你的问题,请参考以下文章

kafka consumer offset机制

kafka 消费者offset记录位置和方式

Kafka 学习笔记之 High Level Consumer相关参数

kafka系列之(3)——Coordinator与offset管理和Consumer Rebalance

Kafka offset管理

kafka 提交offset