kafka 通信报文格式

Posted allenwas3

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka 通信报文格式相关的知识,希望对你有一定的参考价值。

1. 序列化一条消息

消息有 key 和 value

kafka 提供了基础数据类型的序列化工具,对于业务的自定义类需要自行实现序列化

ProducerRecord 是对象,含 KV 和 headers,此时的 KV 还是对象

在 KafkaProducer#doSend 中会对 KV 进行序列化,得到 KV 的 byte 数组

然后把 byte 数组和 headers 加入到 ProducerBatch 中

代码见:

org.apache.kafka.clients.producer.internals.ProducerBatch#recordsBuilder
org.apache.kafka.common.record.MemoryRecordsBuilder#appendStream

 

2. kafka 的 tcp 报文

利用 Struct 和 Schema 把 ProducerBatch 的数据转换成符合 kafka 格式的 tcp 报文

以发送消息为例

org.apache.kafka.common.requests.AbstractRequest#toSend
org.apache.kafka.common.requests.AbstractRequest#serialize
org.apache.kafka.common.requests.AbstractRequestResponse#serialize
org.apache.kafka.common.requests.ProduceRequest#toStruct
org.apache.kafka.common.protocol.types.Schema#write

org.apache.kafka.common.requests.RequestHeader#toStruct

public Struct toStruct() 
    Schema schema = schema(apiKey.id, apiVersion);
    Struct struct = new Struct(schema);
    struct.set(API_KEY_FIELD_NAME, apiKey.id);
    struct.set(API_VERSION_FIELD_NAME, apiVersion);

    // only v0 of the controlled shutdown request is missing the clientId
    if (struct.hasField(CLIENT_ID_FIELD_NAME))
        struct.set(CLIENT_ID_FIELD_NAME, clientId);
    struct.set(CORRELATION_ID_FIELD_NAME, correlationId);
    return struct;

org.apache.kafka.common.requests.ProduceRequest#toStruct

public Struct toStruct() 
    // Store it in a local variable to protect against concurrent updates
    Map<TopicPartition, MemoryRecords> partitionRecords = partitionRecordsOrFail();
    short version = version();
    Struct struct = new Struct(ApiKeys.PRODUCE.requestSchema(version));
    Map<String, Map<Integer, MemoryRecords>> recordsByTopic = CollectionUtils.groupDataByTopic(partitionRecords);
    struct.set(ACKS_KEY_NAME, acks);
    struct.set(TIMEOUT_KEY_NAME, timeout);
    struct.setIfExists(NULLABLE_TRANSACTIONAL_ID, transactionalId);

    List<Struct> topicDatas = new ArrayList<>(recordsByTopic.size());
    for (Map.Entry<String, Map<Integer, MemoryRecords>> topicEntry : recordsByTopic.entrySet()) 
        Struct topicData = struct.instance(TOPIC_DATA_KEY_NAME);
        topicData.set(TOPIC_NAME, topicEntry.getKey());
        List<Struct> partitionArray = new ArrayList<>();
        for (Map.Entry<Integer, MemoryRecords> partitionEntry : topicEntry.getValue().entrySet()) 
            MemoryRecords records = partitionEntry.getValue();
            Struct part = topicData.instance(PARTITION_DATA_KEY_NAME)
                    .set(PARTITION_ID, partitionEntry.getKey())
                    .set(RECORD_SET_KEY_NAME, records);
            partitionArray.add(part);
        
        topicData.set(PARTITION_DATA_KEY_NAME, partitionArray.toArray());
        topicDatas.add(topicData);
    
    struct.set(TOPIC_DATA_KEY_NAME, topicDatas.toArray());
    return struct;

组装报文

public abstract class AbstractRequestResponse 
    /**
     * Visible for testing.
     */
    public static ByteBuffer serialize(Struct headerStruct, Struct bodyStruct) 
        ByteBuffer buffer = ByteBuffer.allocate(headerStruct.sizeOf() + bodyStruct.sizeOf());
        headerStruct.writeTo(buffer);
        bodyStruct.writeTo(buffer);
        buffer.rewind();
        return buffer;
    


public class NetworkSend extends ByteBufferSend 

    public NetworkSend(String destination, ByteBuffer buffer) 
        super(destination, sizeDelimit(buffer));
    

    private static ByteBuffer[] sizeDelimit(ByteBuffer buffer) 
        return new ByteBuffer[] sizeBuffer(buffer.remaining()), buffer;
    

    private static ByteBuffer sizeBuffer(int size) 
        ByteBuffer sizeBuffer = ByteBuffer.allocate(4);
        sizeBuffer.putInt(size);
        sizeBuffer.rewind();
        return sizeBuffer;
    

所以能推断出,kafka 报文格式:4 字节存储长度,headerStruct,bodyStruct

当然通过 NetworkSend 和 NetworkReceive 的注释也能看出来

以上是关于kafka 通信报文格式的主要内容,如果未能解决你的问题,请参考以下文章

Modbus的报文格式

上位机读取udp的报文是实时的报文吗

TCP协议特点及其报文格式

浅析HTTP协议的请求报文和响应报文

请求响应报文

详解CAN总线:CAN总线报文格式—错误帧