01 kafka 记录的存储

Posted 蓝风9

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了01 kafka 记录的存储相关的知识,希望对你有一定的参考价值。

前言

最近 会涉及到一些 kafka 相关的环境搭建, 然后 客户端 和 服务器 连接的过程中会出现一些问题

因而 会存在一些 需要了解 kafka 代码的一些需求

从而 衍生出 一些 知识点 的分析, 记录

kafka 的记录是如何存储的 这里就是其中之一

kafka 服务器基于 2.4.1, 客户端基于 2.2.0

测试用例

我们就来看看 我们的这个 producerRecord 是如何存储的 

/**
 * Test06KafkaProducer
 *
 * @author Jerry.X.He <970655147@qq.com>
 * @version 1.0
 * @date 2022-05-28 10:14
 */
public class Test06KafkaProducer 

    // Test06KafkaProducer
    public static void main(String[] args) 

        Properties properties = new Properties();
        properties.put("bootstrap.servers", "master:9092");
        properties.put("acks", "all");
        properties.put("retries", 0);
        properties.put("batch.size", 16384);
        properties.put("linger.ms", 1);
        properties.put("buffer.memory", 33554432);
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        String topic = "test20220528";
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
        for (int i = 1; i <= 1; i++) 
            kafkaProducer.send(new ProducerRecord<>(topic, "message" + i));
            System.out.println("message" + i);
        
        kafkaProducer.close();

    


客户端的处理

send 的时候会将 body 的相关信息写入到 byteBuffer 

这里在 byteBuffer 中写入的数据的偏移段是 61 - 75 

# 以这里的场景为例, body 中的实际情况如下 
bodyLen = 14 : 1 byte
attribute = 0 : 1 byte
timestampDelta = 0 : 1 byte
offsetDelta = 0 : 1 byte
key = -1 : 1 byte
valueLen = 8 : 1 byte
value = message1 : 8 byte
headerLen = 0 : 1 byte

序列化 body 的到 ProducerBatch 之后, 将 ProducerBatch 添加到 accumulator 的 batches 中, 然后 发送的任务交给 producer 线程 

producer 线程 从队列中获取需要发送的消息, 并写入 header 信息, 发送

# 以这里的场景为例, header 中的实际情况如下 
baseOffset = 0 : 8 byte
lengthOffset = 64 : 4 byte
partitionLeaderEpoch = -1 : 4 byte
magic = 2 : 1 byte
crc = 558737589 : 4 byte
attributes = 0 : 2 byte
lastOffsetDelta = 0 : 4 byte
firstTimestamp = 1654310895383 : 8 byte
maxTimestamp = 1654310895383 : 8 byte
producerId = -1 : 8 byte
epoch = -1 : 2 byte
sequence = -1 : 4 byte
numRecords = 1 : 4 byte

在 MemoryRecordsBuilder.close 的时候转换为 MemoryRecords 

从队列中获取到 当前批次需要发送的数据之后, 构造 ProducerRequest 和服务器交互 

服务端的处理

服务器拿到 Produce request 之后走 handleProduceRequest 

partionRecords 中对应的就是客户端传递过来的根据 TopicPartition 分组之后的数据

然后尝试将数据写入到 log, 这里的 entriesPerPartition 包含了多个 TopicParition 的数据 

细化到每一个 TopicParition 会对应于一个 Log 

将这个 TopicParition 的多个 record 写入到日志中 

一个 Log 会对应于多个 Segment, 根据 log.segment.bytes 作为拆分的大小 

并根据一定的大小 抽样记录 baseOffset -> physicalOffset, baseOffset -> timeStamp 

所以 除了记录数据的 log 文件之外, 还有一个 index, 一个 timeindex 文件, 用于加速查询的 

-rw-r--r--  1 jerry  wheel         8 Jun  5 15:49 00000000000000000000.index
-rw-r--r--  1 jerry  wheel       228 Jun  5 15:49 00000000000000000000.log
-rw-r--r--  1 jerry  wheel        12 Jun  5 15:49 00000000000000000000.timeindex

服务端的处理中还有一个细节是 关于 record 的 baseOffset 的设置, 这里 record 的 byteBuffer 的偏移是 58, 因此 baseOffset 的数据存储是在 58 - 64, 如下图 这里设置 baseOffset 为 14 

record 委托 byteBuffer 更新 baseOffset 

kafka 的三类日志文件 

文件包含三类, log 文件, index 文件, timeindex 文件 

log 中包含的是具体的数据, 包含了如上的 headerInfo 和 bodyInfo 

index 文件是抽样的一部分 baseOffset -> physicalOffset 的映射, physicalOffset 指的是 baseOffset 对应的记录存储于 log 文件的偏移 

timeindex 文件是抽样的一部分 baseOffset -> maxTimestamp 

log 文件

/**
 * Test09ReadKafkaLog
 *
 * @author Jerry.X.He <970655147@qq.com>
 * @version 1.0
 * @date 2022-06-04 11:02
 */
public class Test09ReadKafkaLog 

    // Test09ReadKafkaLog
    public static void main(String[] args) throws Exception 

        String path = "/tmp/kafka-logs/test20220528-0/00000000000000000000.log";
//        String path = "/tmp/kafka-logs/test20220528-0/00000000000000000003.log";
        File file = new File(path);
        int messageCount = (int) (file.length() / 76);
        InputStream is = new FileInputStream(path);
        DataInputStream dis = new DataInputStream(is);

        long availableBefore = is.available();
        List<RecordInfo> list = new ArrayList<>();
        for (int i = 0; i < messageCount; i++) 
            // header
            long baseOffset = dis.readLong();
            int lengthOffset = dis.readInt();
            int partitionLeaderEpoch = dis.readInt();
            int magic = dis.readByte();
            int crc = dis.readInt();
            int attributeCount = dis.readShort();
            int lastOffsetDelta = dis.readInt();
            Date firstTimestamp = new Date(dis.readLong());
            Date maxTimestamp = new Date(dis.readLong());
            long producerId = dis.readLong();
            int epoch = dis.readShort();
            int sequence = dis.readInt();
            int numRecords = dis.readInt();

            // body
            int bodyLen = dis.readByte() >> 1;
            int attributeCountInBody = dis.readByte();
            int timestampDelta = dis.readByte() >> 1;
            int offsetDelta = dis.readByte() >> 1;
            int key = dis.readByte();
            int valueLen = dis.readByte() >> 1;
            byte[] valueBytes = new byte[valueLen];
            dis.read(valueBytes);
            String value = new String(valueBytes);
            int headerLen = dis.readByte() >> 1;

            HeaderInfo headerInfo = new HeaderInfo(baseOffset, lengthOffset, partitionLeaderEpoch, magic, crc,
                    attributeCount, lastOffsetDelta, firstTimestamp, maxTimestamp, producerId, epoch, sequence, numRecords);
            BodyInfo bodyInfo = new BodyInfo(bodyLen, attributeCountInBody, timestampDelta, offsetDelta, key, valueLen, valueBytes, value, headerLen);
            RecordInfo recordInfo = new RecordInfo(headerInfo, bodyInfo);
            list.add(recordInfo);
        
        long availableAfter = is.available();

        System.out.println(String.format(" availBefore : %s, availAfter : %s ", availableBefore, availableAfter));
        for (RecordInfo recordInfo : list) 
            System.out.println(JSON.toJSONString(recordInfo, SerializerFeature.WriteDateUseDateFormat));
        
        int x = 0;

    

    @Data
    private static class RecordInfo 
        public HeaderInfo headerInfo;
        public BodyInfo bodyInfo;

        public RecordInfo(HeaderInfo headerInfo, BodyInfo bodyInfo) 
            this.headerInfo = headerInfo;
            this.bodyInfo = bodyInfo;
        
    

    @Data
    private static class HeaderInfo 
        public long baseOffset;
        public int lengthOffset;
        public int partitionLeaderEpoch;
        public int magic;
        public int crc;
        public int attributeCount;
        public int lastOffsetDelta;
        public Date firstTimestamp;
        public Date maxTimestamp;
        public long producerId;
        public int epoch;
        public int sequence;
        public int numRecords;

        public HeaderInfo(long baseOffset, int lengthOffset, int partitionLeaderEpoch, int magic, int crc,
                          int attributeCount, int lastOffsetDelta, Date firstTimestamp, Date maxTimestamp,
                          long producerId, int epoch, int sequence, int numRecords) 
            this.baseOffset = baseOffset;
            this.lengthOffset = lengthOffset;
            this.partitionLeaderEpoch = partitionLeaderEpoch;
            this.magic = magic;
            this.crc = crc;
            this.attributeCount = attributeCount;
            this.lastOffsetDelta = lastOffsetDelta;
            this.firstTimestamp = firstTimestamp;
            this.maxTimestamp = maxTimestamp;
            this.producerId = producerId;
            this.epoch = epoch;
            this.sequence = sequence;
            this.numRecords = numRecords;
        
    

    @Data
    private static class BodyInfo 
        public int bodyLen;
        public int attributeCount;
        public int timestampDelta;
        public int offsetDelta;
        public int key;
        public int valueLen;
        public byte[] valueBytes;
        public String value;
        public int headerLen;

        public BodyInfo(int bodyLen, int attributeCount, int timestampDelta, int offsetDelta, int key,
                        int valueLen, byte[] valueBytes, String value, int headerLen) 
            this.bodyLen = bodyLen;
            this.attributeCount = attributeCount;
            this.timestampDelta = timestampDelta;
            this.offsetDelta = offsetDelta;
            this.key = key;
            this.valueLen = valueLen;
            this.valueBytes = valueBytes;
            this.value = value;
            this.headerLen = headerLen;
        
    


输出结果如下 

 availBefore : 228, availAfter : 0 
"bodyInfo":"attributeCount":0,"bodyLen":14,"headerLen":0,"key":1,"offsetDelta":0,"timestampDelta":0,"value":"message1","valueBytes":"bWVzc2FnZTE=","valueLen":8,"headerInfo":"attributeCount":0,"baseOffset":0,"crc":458006489,"epoch":-1,"firstTimestamp":"2022-06-05 15:47:32","lastOffsetDelta":0,"lengthOffset":64,"magic":2,"maxTimestamp":"2022-06-05 15:47:32","numRecords":1,"partitionLeaderEpoch":0,"producerId":-1,"sequence":-1
"bodyInfo":"attributeCount":0,"bodyLen":14,"headerLen":0,"key":1,"offsetDelta":0,"timestampDelta":0,"value":"message1","valueBytes":"bWVzc2FnZTE=","valueLen":8,"headerInfo":"attributeCount":0,"baseOffset":1,"crc":-368922856,"epoch":-1,"firstTimestamp":"2022-06-05 15:49:05","lastOffsetDelta":0,"lengthOffset":64,"magic":2,"maxTimestamp":"2022-06-05 15:49:05","numRecords":1,"partitionLeaderEpoch":0,"producerId":-1,"sequence":-1
"bodyInfo":"attributeCount":0,"bodyLen":14,"headerLen":0,"key":1,"offsetDelta":0,"timestampDelta":0,"value":"message1","valueBytes":"bWVzc2FnZTE=","valueLen":8,"headerInfo":"attributeCount":0,"baseOffset":2,"crc":1402645798,"epoch":-1,"firstTimestamp":"2022-06-05 15:49:18","lastOffsetDelta":0,"lengthOffset":64,"magic":2,"maxTimestamp":"2022-06-05 15:49:18","numRecords":1,"partitionLeaderEpoch":0,"producerId":-1,"sequence":-1

index 文件

/**
 * Test09ReadKafkaLog
 *
 * @author Jerry.X.He <970655147@qq.com>
 * @version 1.0
 * @date 2022-06-04 11:02
 */
public class Test09ReadKafkaIndex 

    // Test09ReadKafkaLog
    public static void main(String[] args) throws Exception 

        String path = "/tmp/kafka-logs/test20220528-0/00000000000000000000.index";
        File file = new File(path);
        InputStream is = new FileInputStream(path);
        DataInputStream dis = new DataInputStream(is);

        long availableBefore = is.available();
        List<Pair<Integer, Integer>> offsetList = new ArrayList<>();
        while (dis.available() > 0) 
            int offset1 = dis.readInt();
            int position1 = dis.readInt();
            offsetList.add(new Pair<>(offset1, position1));
        
        long availableAfter = is.available();

        System.out.println(String.format(" availBefore : %s, availAfter : %s ", availableBefore, availableAfter));
        for (Pair<Integer, Integer> pair : offsetList) 
            System.out.println(String.format("%s -> %s", pair.getFirst(), pair.getSecond()));
        

    


输出结果如下 

 availBefore : 8, availAfter : 0 
2 -> 152

timeindex 文件

/**
 * Test09ReadKafkaLog
 *
 * @author Jerry.X.He <970655147@qq.com>
 * @version 1.0
 * @date 2022-06-04 11:02
 */
public class Test09ReadKafkaTimeIndex 

    // Test09ReadKafkaLog
    public static void main(String[] args) throws Exception 

        String path = "/tmp/kafka-logs/test20220528-0/00000000000000000000.timeindex";
        File file = new File(path);
        InputStream is = new FileInputStream(path);
        DataInputStream dis = new DataInputStream(is);

        long availableBefore = is.available();
        List<Pair<Integer, Date>> offsetList = new ArrayList<>();
        while (dis.available() > 0) 
            Date timestamp1 = new Date(dis.readLong());
            int offset1 = dis.readInt();
            offsetList.add(new Pair<>(offset1, timestamp1));
        
        long availableAfter = is.available();

        System.out.println(String.format(" availBefore : %s, availAfter : %s ", availableBefore, availableAfter));
        for (Pair<Integer, Date> pair : offsetList) 
            System.out.println(String.format("%s -> %s", pair.getFirst(), DateFormatUtils.format(pair.getSecond(), "yyyy-MM-dd hh:mm:ss")));
        

    


输出结果如下 

 availBefore : 12, availAfter : 0 
2 -> 2022-06-05 03:49:18

完 

以上是关于01 kafka 记录的存储的主要内容,如果未能解决你的问题,请参考以下文章

Kafka的日志存储

kafka学习记录—Broker(服役退役节点,kafka副本,文件存储)

如何从 Kafka 全局状态存储中删除记录?

Kafka之入门

Kafka 流式添加全局存储的用例

Kafka 集群丢失或重复消息