Kafka的消息格式及offset是如何设置的

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kafka的消息格式及offset是如何设置的相关的知识,希望对你有一定的参考价值。

参考技术A Kafka的offset是如何设置的?

答:是生产者设置的,生产者在发送消息的时候,为每条消息生成一个唯一的offset。

Kafka消息的格式?

答:

Kafka最新版本的消息集叫做RecordBatch,而不是先前的MessageSet。RecordBatch内部存储了一条或多条消息。

RecordBatch的结构包含以下部分:

first offset,起始位移,占位8B

length,消息总长度,占位4B

partition leader epoch,分区leader纪元,可以看做分区leader的版本号或者更新次数,占位4B。

magic,消息格式的版本号,对于V2版本而言,magic的值为2。

attributes,消息属性,占位2B,低三位表示压缩格式,第4位表示时间戳类型,第五位表示当前RecordBatch是否处于事务中第6位表示是否控制消息。

last offset delta,占位4B,RecordBatch中最后一个Record的offset与first offset的差值,主要被broker用来确保RecordBatch中Record组装的正确性。

first timestamp,占位8B,RecordBatch中第一条Record的时间戳。

max timestamp,占位8B,RecordBatch中最大的时间戳,一般情况下是最后一个Record的时间戳。和last offset delta功能一样,主要被broker用来确保RecordBatch中Record组装的正确性。

producer id,即PID,占位8B,用来支持幂等和事务。

producer epoch,占位2B,用来支持幂等和事务。

first sequence,占位4B,用来支持幂等和事务。

records count,占位4B,RecordBatch中Record的总数。

records,存放消息的容器。

Records的数据结构又是什么样的呢?Record包含以下属性:

length,消息总长度。

attributes,目前已弃用,但是还是会占用1B的空间,以备未来的格式扩展。

timestamp delta,时间戳增量,通常一个timestamp占用8B,这里时间戳增量保存的是当前时间戳与RecordBatch中first timestamp的差值,这样可以节省占用空间。

offset delta,位移增量,这个是当前消息的位移与RecordBatch中first offset的差值,这样可以节省占用空间。

key length,消息的key的长度。

key,消息key。

value length,消息的value的长度。

value,消息的值。

headers count,headers的总数。

headers,这个字段是用来支持应用级别的扩展,而不需要将一些应用级别的属性嵌入到消息体中。

以上是关于Kafka的消息格式及offset是如何设置的的主要内容,如果未能解决你的问题,请参考以下文章

Kafka offset管理

kafka 提交offset

Spring自带Kafka消息异常处理

Kafka Consumer配置——auto.offset.reset如何控制消息消费

如何获取kafka某一topic中最新的offset

kafka防止消息重复消费