01 kafka 记录的存储
Posted 蓝风9
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了01 kafka 记录的存储相关的知识,希望对你有一定的参考价值。
前言
最近 会涉及到一些 kafka 相关的环境搭建, 然后 客户端 和 服务器 连接的过程中会出现一些问题
因而 会存在一些 需要了解 kafka 代码的一些需求
从而 衍生出 一些 知识点 的分析, 记录
kafka 的记录是如何存储的 这里就是其中之一
kafka 服务器基于 2.4.1, 客户端基于 2.2.0
测试用例
我们就来看看 我们的这个 producerRecord 是如何存储的
/**
* Test06KafkaProducer
*
* @author Jerry.X.He <970655147@qq.com>
* @version 1.0
* @date 2022-05-28 10:14
*/
public class Test06KafkaProducer
// Test06KafkaProducer
public static void main(String[] args)
Properties properties = new Properties();
properties.put("bootstrap.servers", "master:9092");
properties.put("acks", "all");
properties.put("retries", 0);
properties.put("batch.size", 16384);
properties.put("linger.ms", 1);
properties.put("buffer.memory", 33554432);
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
String topic = "test20220528";
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
for (int i = 1; i <= 1; i++)
kafkaProducer.send(new ProducerRecord<>(topic, "message" + i));
System.out.println("message" + i);
kafkaProducer.close();
客户端的处理
send 的时候会将 body 的相关信息写入到 byteBuffer
这里在 byteBuffer 中写入的数据的偏移段是 61 - 75
# 以这里的场景为例, body 中的实际情况如下
bodyLen = 14 : 1 byte
attribute = 0 : 1 byte
timestampDelta = 0 : 1 byte
offsetDelta = 0 : 1 byte
key = -1 : 1 byte
valueLen = 8 : 1 byte
value = message1 : 8 byte
headerLen = 0 : 1 byte
序列化 body 的到 ProducerBatch 之后, 将 ProducerBatch 添加到 accumulator 的 batches 中, 然后 发送的任务交给 producer 线程
producer 线程 从队列中获取需要发送的消息, 并写入 header 信息, 发送
# 以这里的场景为例, header 中的实际情况如下
baseOffset = 0 : 8 byte
lengthOffset = 64 : 4 byte
partitionLeaderEpoch = -1 : 4 byte
magic = 2 : 1 byte
crc = 558737589 : 4 byte
attributes = 0 : 2 byte
lastOffsetDelta = 0 : 4 byte
firstTimestamp = 1654310895383 : 8 byte
maxTimestamp = 1654310895383 : 8 byte
producerId = -1 : 8 byte
epoch = -1 : 2 byte
sequence = -1 : 4 byte
numRecords = 1 : 4 byte
在 MemoryRecordsBuilder.close 的时候转换为 MemoryRecords
从队列中获取到 当前批次需要发送的数据之后, 构造 ProducerRequest 和服务器交互
服务端的处理
服务器拿到 Produce request 之后走 handleProduceRequest
partionRecords 中对应的就是客户端传递过来的根据 TopicPartition 分组之后的数据
然后尝试将数据写入到 log, 这里的 entriesPerPartition 包含了多个 TopicParition 的数据
细化到每一个 TopicParition 会对应于一个 Log
将这个 TopicParition 的多个 record 写入到日志中
一个 Log 会对应于多个 Segment, 根据 log.segment.bytes 作为拆分的大小
并根据一定的大小 抽样记录 baseOffset -> physicalOffset, baseOffset -> timeStamp
所以 除了记录数据的 log 文件之外, 还有一个 index, 一个 timeindex 文件, 用于加速查询的
-rw-r--r-- 1 jerry wheel 8 Jun 5 15:49 00000000000000000000.index
-rw-r--r-- 1 jerry wheel 228 Jun 5 15:49 00000000000000000000.log
-rw-r--r-- 1 jerry wheel 12 Jun 5 15:49 00000000000000000000.timeindex
服务端的处理中还有一个细节是 关于 record 的 baseOffset 的设置, 这里 record 的 byteBuffer 的偏移是 58, 因此 baseOffset 的数据存储是在 58 - 64, 如下图 这里设置 baseOffset 为 14
record 委托 byteBuffer 更新 baseOffset
kafka 的三类日志文件
文件包含三类, log 文件, index 文件, timeindex 文件
log 中包含的是具体的数据, 包含了如上的 headerInfo 和 bodyInfo
index 文件是抽样的一部分 baseOffset -> physicalOffset 的映射, physicalOffset 指的是 baseOffset 对应的记录存储于 log 文件的偏移
timeindex 文件是抽样的一部分 baseOffset -> maxTimestamp
log 文件
/**
* Test09ReadKafkaLog
*
* @author Jerry.X.He <970655147@qq.com>
* @version 1.0
* @date 2022-06-04 11:02
*/
public class Test09ReadKafkaLog
// Test09ReadKafkaLog
public static void main(String[] args) throws Exception
String path = "/tmp/kafka-logs/test20220528-0/00000000000000000000.log";
// String path = "/tmp/kafka-logs/test20220528-0/00000000000000000003.log";
File file = new File(path);
int messageCount = (int) (file.length() / 76);
InputStream is = new FileInputStream(path);
DataInputStream dis = new DataInputStream(is);
long availableBefore = is.available();
List<RecordInfo> list = new ArrayList<>();
for (int i = 0; i < messageCount; i++)
// header
long baseOffset = dis.readLong();
int lengthOffset = dis.readInt();
int partitionLeaderEpoch = dis.readInt();
int magic = dis.readByte();
int crc = dis.readInt();
int attributeCount = dis.readShort();
int lastOffsetDelta = dis.readInt();
Date firstTimestamp = new Date(dis.readLong());
Date maxTimestamp = new Date(dis.readLong());
long producerId = dis.readLong();
int epoch = dis.readShort();
int sequence = dis.readInt();
int numRecords = dis.readInt();
// body
int bodyLen = dis.readByte() >> 1;
int attributeCountInBody = dis.readByte();
int timestampDelta = dis.readByte() >> 1;
int offsetDelta = dis.readByte() >> 1;
int key = dis.readByte();
int valueLen = dis.readByte() >> 1;
byte[] valueBytes = new byte[valueLen];
dis.read(valueBytes);
String value = new String(valueBytes);
int headerLen = dis.readByte() >> 1;
HeaderInfo headerInfo = new HeaderInfo(baseOffset, lengthOffset, partitionLeaderEpoch, magic, crc,
attributeCount, lastOffsetDelta, firstTimestamp, maxTimestamp, producerId, epoch, sequence, numRecords);
BodyInfo bodyInfo = new BodyInfo(bodyLen, attributeCountInBody, timestampDelta, offsetDelta, key, valueLen, valueBytes, value, headerLen);
RecordInfo recordInfo = new RecordInfo(headerInfo, bodyInfo);
list.add(recordInfo);
long availableAfter = is.available();
System.out.println(String.format(" availBefore : %s, availAfter : %s ", availableBefore, availableAfter));
for (RecordInfo recordInfo : list)
System.out.println(JSON.toJSONString(recordInfo, SerializerFeature.WriteDateUseDateFormat));
int x = 0;
@Data
private static class RecordInfo
public HeaderInfo headerInfo;
public BodyInfo bodyInfo;
public RecordInfo(HeaderInfo headerInfo, BodyInfo bodyInfo)
this.headerInfo = headerInfo;
this.bodyInfo = bodyInfo;
@Data
private static class HeaderInfo
public long baseOffset;
public int lengthOffset;
public int partitionLeaderEpoch;
public int magic;
public int crc;
public int attributeCount;
public int lastOffsetDelta;
public Date firstTimestamp;
public Date maxTimestamp;
public long producerId;
public int epoch;
public int sequence;
public int numRecords;
public HeaderInfo(long baseOffset, int lengthOffset, int partitionLeaderEpoch, int magic, int crc,
int attributeCount, int lastOffsetDelta, Date firstTimestamp, Date maxTimestamp,
long producerId, int epoch, int sequence, int numRecords)
this.baseOffset = baseOffset;
this.lengthOffset = lengthOffset;
this.partitionLeaderEpoch = partitionLeaderEpoch;
this.magic = magic;
this.crc = crc;
this.attributeCount = attributeCount;
this.lastOffsetDelta = lastOffsetDelta;
this.firstTimestamp = firstTimestamp;
this.maxTimestamp = maxTimestamp;
this.producerId = producerId;
this.epoch = epoch;
this.sequence = sequence;
this.numRecords = numRecords;
@Data
private static class BodyInfo
public int bodyLen;
public int attributeCount;
public int timestampDelta;
public int offsetDelta;
public int key;
public int valueLen;
public byte[] valueBytes;
public String value;
public int headerLen;
public BodyInfo(int bodyLen, int attributeCount, int timestampDelta, int offsetDelta, int key,
int valueLen, byte[] valueBytes, String value, int headerLen)
this.bodyLen = bodyLen;
this.attributeCount = attributeCount;
this.timestampDelta = timestampDelta;
this.offsetDelta = offsetDelta;
this.key = key;
this.valueLen = valueLen;
this.valueBytes = valueBytes;
this.value = value;
this.headerLen = headerLen;
输出结果如下
availBefore : 228, availAfter : 0
"bodyInfo":"attributeCount":0,"bodyLen":14,"headerLen":0,"key":1,"offsetDelta":0,"timestampDelta":0,"value":"message1","valueBytes":"bWVzc2FnZTE=","valueLen":8,"headerInfo":"attributeCount":0,"baseOffset":0,"crc":458006489,"epoch":-1,"firstTimestamp":"2022-06-05 15:47:32","lastOffsetDelta":0,"lengthOffset":64,"magic":2,"maxTimestamp":"2022-06-05 15:47:32","numRecords":1,"partitionLeaderEpoch":0,"producerId":-1,"sequence":-1
"bodyInfo":"attributeCount":0,"bodyLen":14,"headerLen":0,"key":1,"offsetDelta":0,"timestampDelta":0,"value":"message1","valueBytes":"bWVzc2FnZTE=","valueLen":8,"headerInfo":"attributeCount":0,"baseOffset":1,"crc":-368922856,"epoch":-1,"firstTimestamp":"2022-06-05 15:49:05","lastOffsetDelta":0,"lengthOffset":64,"magic":2,"maxTimestamp":"2022-06-05 15:49:05","numRecords":1,"partitionLeaderEpoch":0,"producerId":-1,"sequence":-1
"bodyInfo":"attributeCount":0,"bodyLen":14,"headerLen":0,"key":1,"offsetDelta":0,"timestampDelta":0,"value":"message1","valueBytes":"bWVzc2FnZTE=","valueLen":8,"headerInfo":"attributeCount":0,"baseOffset":2,"crc":1402645798,"epoch":-1,"firstTimestamp":"2022-06-05 15:49:18","lastOffsetDelta":0,"lengthOffset":64,"magic":2,"maxTimestamp":"2022-06-05 15:49:18","numRecords":1,"partitionLeaderEpoch":0,"producerId":-1,"sequence":-1
index 文件
/**
* Test09ReadKafkaLog
*
* @author Jerry.X.He <970655147@qq.com>
* @version 1.0
* @date 2022-06-04 11:02
*/
public class Test09ReadKafkaIndex
// Test09ReadKafkaLog
public static void main(String[] args) throws Exception
String path = "/tmp/kafka-logs/test20220528-0/00000000000000000000.index";
File file = new File(path);
InputStream is = new FileInputStream(path);
DataInputStream dis = new DataInputStream(is);
long availableBefore = is.available();
List<Pair<Integer, Integer>> offsetList = new ArrayList<>();
while (dis.available() > 0)
int offset1 = dis.readInt();
int position1 = dis.readInt();
offsetList.add(new Pair<>(offset1, position1));
long availableAfter = is.available();
System.out.println(String.format(" availBefore : %s, availAfter : %s ", availableBefore, availableAfter));
for (Pair<Integer, Integer> pair : offsetList)
System.out.println(String.format("%s -> %s", pair.getFirst(), pair.getSecond()));
输出结果如下
availBefore : 8, availAfter : 0
2 -> 152
timeindex 文件
/**
* Test09ReadKafkaLog
*
* @author Jerry.X.He <970655147@qq.com>
* @version 1.0
* @date 2022-06-04 11:02
*/
public class Test09ReadKafkaTimeIndex
// Test09ReadKafkaLog
public static void main(String[] args) throws Exception
String path = "/tmp/kafka-logs/test20220528-0/00000000000000000000.timeindex";
File file = new File(path);
InputStream is = new FileInputStream(path);
DataInputStream dis = new DataInputStream(is);
long availableBefore = is.available();
List<Pair<Integer, Date>> offsetList = new ArrayList<>();
while (dis.available() > 0)
Date timestamp1 = new Date(dis.readLong());
int offset1 = dis.readInt();
offsetList.add(new Pair<>(offset1, timestamp1));
long availableAfter = is.available();
System.out.println(String.format(" availBefore : %s, availAfter : %s ", availableBefore, availableAfter));
for (Pair<Integer, Date> pair : offsetList)
System.out.println(String.format("%s -> %s", pair.getFirst(), DateFormatUtils.format(pair.getSecond(), "yyyy-MM-dd hh:mm:ss")));
输出结果如下
availBefore : 12, availAfter : 0
2 -> 2022-06-05 03:49:18
完
以上是关于01 kafka 记录的存储的主要内容,如果未能解决你的问题,请参考以下文章