Kafka发送消息流程

Posted 笨小孩撸代码

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kafka发送消息流程相关的知识,希望对你有一定的参考价值。

  1. ProducerInterceptor对消息进行拦截

  2. Serializer对消息的key和value进行序列化

  3. Partitioner为消息选择合适的Partition

  4. RecordAccumulator收集消息,实现批量发送

  5. sender从RecordAccumulator获取消息

  6. 构造一个ClientRequest,这里是KafkaClient

  7. 将ClientRequest交给NetworkClient,准备发送

  8. NetworkClient将请求放入KafkaChannel的缓存

  9. 执行网络IO,发送请求

  10. 收到相应,调用ClientRequest的回调函数

  11. 调用RecordBatch的回调函数,最终调用每个消息上注册的回调函数。

       消息发送的过程中,涉及到两个线程协同工作,主线程首先将业务数据封装成ProducerRecord对象,之后调用send()方法将消息放入RecordAccumulator(消息收集器,也可以理解为主线程与Sender线程直接的缓冲区)中暂存,Sender线程负责将消息信息构成请求,并最终执行网络I/O的线程,它从RecordAccumulator中取出消息并批量发送出去,需要注意的是,KafkaProducer是线程安全的,多个线程间可以共享使用同一个KafkaProducer对象

       KafkaProducer实现了Producer接口,在Producer接口中定义KafkaProducer对外提供的API

  1. send()方法,发送消息,实际是将消息放入RecordAccumulator暂存,等待发送

  2. flush()方法:刷新操作,等待RecordAccumulator中所有消息发送完成,在刷新完成之前会阻塞调用的线程

  3. partitionsFor()方法:在KafkaProducer中维护了一个Metadata对象用于存储kafka集群的元数据,Metadata中的元数据会定期更新,partitionsFor()方法负责从Metadata中获取指定Topic中的分区信息

  4. close()方法:关闭此Producer对象,主要操作是设置close标志,等待RecordAccumulator中的消息清空,关闭sender线程

  5. metrics()方法:用于记录统计信息,与消息发送的流程无关



以上是关于Kafka发送消息流程的主要内容,如果未能解决你的问题,请参考以下文章

源码分析 Kafka 消息发送流程(文末附流程图)

kafka生产者的发送消息的流程以及代码案例

Kafka生产者消息发送流程

Kafka工作流程

Kafka系列之(4)——Kafka Producer流程解析

Kafka JavaAPI