kafka 通信报文格式
Posted allenwas3
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka 通信报文格式相关的知识,希望对你有一定的参考价值。
1. 序列化一条消息
消息有 key 和 value
kafka 提供了基础数据类型的序列化工具,对于业务的自定义类需要自行实现序列化
ProducerRecord 是对象,含 KV 和 headers,此时的 KV 还是对象
在 KafkaProducer#doSend 中会对 KV 进行序列化,得到 KV 的 byte 数组
然后把 byte 数组和 headers 加入到 ProducerBatch 中
代码见:
org.apache.kafka.clients.producer.internals.ProducerBatch#recordsBuilder
org.apache.kafka.common.record.MemoryRecordsBuilder#appendStream
2. kafka 的 tcp 报文
利用 Struct 和 Schema 把 ProducerBatch 的数据转换成符合 kafka 格式的 tcp 报文
以发送消息为例
org.apache.kafka.common.requests.AbstractRequest#toSend
org.apache.kafka.common.requests.AbstractRequest#serialize
org.apache.kafka.common.requests.AbstractRequestResponse#serialize
org.apache.kafka.common.requests.ProduceRequest#toStruct
org.apache.kafka.common.protocol.types.Schema#write
org.apache.kafka.common.requests.RequestHeader#toStruct
public Struct toStruct() Schema schema = schema(apiKey.id, apiVersion); Struct struct = new Struct(schema); struct.set(API_KEY_FIELD_NAME, apiKey.id); struct.set(API_VERSION_FIELD_NAME, apiVersion); // only v0 of the controlled shutdown request is missing the clientId if (struct.hasField(CLIENT_ID_FIELD_NAME)) struct.set(CLIENT_ID_FIELD_NAME, clientId); struct.set(CORRELATION_ID_FIELD_NAME, correlationId); return struct;
org.apache.kafka.common.requests.ProduceRequest#toStruct
public Struct toStruct() // Store it in a local variable to protect against concurrent updates Map<TopicPartition, MemoryRecords> partitionRecords = partitionRecordsOrFail(); short version = version(); Struct struct = new Struct(ApiKeys.PRODUCE.requestSchema(version)); Map<String, Map<Integer, MemoryRecords>> recordsByTopic = CollectionUtils.groupDataByTopic(partitionRecords); struct.set(ACKS_KEY_NAME, acks); struct.set(TIMEOUT_KEY_NAME, timeout); struct.setIfExists(NULLABLE_TRANSACTIONAL_ID, transactionalId); List<Struct> topicDatas = new ArrayList<>(recordsByTopic.size()); for (Map.Entry<String, Map<Integer, MemoryRecords>> topicEntry : recordsByTopic.entrySet()) Struct topicData = struct.instance(TOPIC_DATA_KEY_NAME); topicData.set(TOPIC_NAME, topicEntry.getKey()); List<Struct> partitionArray = new ArrayList<>(); for (Map.Entry<Integer, MemoryRecords> partitionEntry : topicEntry.getValue().entrySet()) MemoryRecords records = partitionEntry.getValue(); Struct part = topicData.instance(PARTITION_DATA_KEY_NAME) .set(PARTITION_ID, partitionEntry.getKey()) .set(RECORD_SET_KEY_NAME, records); partitionArray.add(part); topicData.set(PARTITION_DATA_KEY_NAME, partitionArray.toArray()); topicDatas.add(topicData); struct.set(TOPIC_DATA_KEY_NAME, topicDatas.toArray()); return struct;
组装报文
public abstract class AbstractRequestResponse /** * Visible for testing. */ public static ByteBuffer serialize(Struct headerStruct, Struct bodyStruct) ByteBuffer buffer = ByteBuffer.allocate(headerStruct.sizeOf() + bodyStruct.sizeOf()); headerStruct.writeTo(buffer); bodyStruct.writeTo(buffer); buffer.rewind(); return buffer; public class NetworkSend extends ByteBufferSend public NetworkSend(String destination, ByteBuffer buffer) super(destination, sizeDelimit(buffer)); private static ByteBuffer[] sizeDelimit(ByteBuffer buffer) return new ByteBuffer[] sizeBuffer(buffer.remaining()), buffer; private static ByteBuffer sizeBuffer(int size) ByteBuffer sizeBuffer = ByteBuffer.allocate(4); sizeBuffer.putInt(size); sizeBuffer.rewind(); return sizeBuffer;
所以能推断出,kafka 报文格式:4 字节存储长度,headerStruct,bodyStruct
当然通过 NetworkSend 和 NetworkReceive 的注释也能看出来
以上是关于kafka 通信报文格式的主要内容,如果未能解决你的问题,请参考以下文章