Kafka发送消息流程
Posted 笨小孩撸代码
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kafka发送消息流程相关的知识,希望对你有一定的参考价值。
ProducerInterceptor对消息进行拦截
Serializer对消息的key和value进行序列化
Partitioner为消息选择合适的Partition
RecordAccumulator收集消息,实现批量发送
sender从RecordAccumulator获取消息
构造一个ClientRequest,这里是KafkaClient
将ClientRequest交给NetworkClient,准备发送
NetworkClient将请求放入KafkaChannel的缓存
执行网络IO,发送请求
收到相应,调用ClientRequest的回调函数
调用RecordBatch的回调函数,最终调用每个消息上注册的回调函数。
消息发送的过程中,涉及到两个线程协同工作,主线程首先将业务数据封装成ProducerRecord对象,之后调用send()方法将消息放入RecordAccumulator(消息收集器,也可以理解为主线程与Sender线程直接的缓冲区)中暂存,Sender线程负责将消息信息构成请求,并最终执行网络I/O的线程,它从RecordAccumulator中取出消息并批量发送出去,需要注意的是,KafkaProducer是线程安全的,多个线程间可以共享使用同一个KafkaProducer对象
KafkaProducer实现了Producer接口,在Producer接口中定义KafkaProducer对外提供的API
send()方法,发送消息,实际是将消息放入RecordAccumulator暂存,等待发送
flush()方法:刷新操作,等待RecordAccumulator中所有消息发送完成,在刷新完成之前会阻塞调用的线程
partitionsFor()方法:在KafkaProducer中维护了一个Metadata对象用于存储kafka集群的元数据,Metadata中的元数据会定期更新,partitionsFor()方法负责从Metadata中获取指定Topic中的分区信息
close()方法:关闭此Producer对象,主要操作是设置close标志,等待RecordAccumulator中的消息清空,关闭sender线程
metrics()方法:用于记录统计信息,与消息发送的流程无关
以上是关于Kafka发送消息流程的主要内容,如果未能解决你的问题,请参考以下文章