Kafka生产者

Posted zlone

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kafka生产者相关的知识,希望对你有一定的参考价值。

前言

使用Kafka也有一段时间了, 主要还是作为生产者推送数据到其它业务方, 作为生产者编写代码并不多, 因为客户端内部做了太多的处理, 同时也不太容易达到性能瓶颈, 大多情况也只是改变一下发送消息的路由策略. 即便如此面对众多参数如果不理解一条消息从发送到Kafka集群中间经历什么, 那么设置这些参数的时候可能达不到最初的目的.

: 以下内容是学习<<图解 Kafka 之实战指南>>记录的笔记, 强烈建议大家读下原文, 可以到掘金搜索

主要架构

技术图片

  • 主线程
    主线程负责创建消息, 依次通过 生产者拦截器 -> 序列化器 -> 分区器 到达消息累加器RecordAccumulator.
    • 生产者拦截器: 可以类比于中间件, 可以对特定消息做处理, 比如过滤和统计等
    • 序列化器: 把消息对象转换为字节数组后发送Kafka集群
    • 分区器: 这里可以设置消息到哪个分区的规则
      • 指定该消息到达分区
      • 轮询分区, 适合于对消息次序不敏感, 消息量较大的情况, 这样消费者可以更好地均衡消费
      • 指定路由key, 相同的key的消息达到相同的分区, 保证消息在分区内的顺序
  • 消息收集器RecordAccumulator
    • 主要目的是缓存消息, 方便Sender线程批量发送, 较少网络传输消耗. 默认大小是32M, 可通过KafkaProducer参数buffer.memory 配置. 如果生产者速度过快超过发送的速度, 那么send方法会阻塞或异常, 超时时间由max.block.ms控制
    • 内部为每一个分区维护了一个双端队列, 写入消息到尾部, Sender从头部读. 队列内容是ProducerBatch
    • ProducerBatch不是单个的消息, 而是一批消息, 包含很多ProducerRecord(单个消息)
    • 当一条消息(ProducerRecord)进入RecordAccumulator, 首先需要找与分区对应的双端队列(没有就创建), 在从队列尾部获取一个ProducerBatch(如果没有则新建), 再看ProducerBatch还有没有空间写入, 如果没有空间, 需要看该消息是否 > batch.size, 如果不大于就按照batch.size的大小新建, 如果大于就按照实际大小新建
  • Sender线程
    Sender线程负责从RecordAccumulator获取消息发送到Kafka集群.
    1. <分区, Deque<ProducerBatch>> 转换为 <Node, List<ProducerBatch>>. Node表示kafka节点
    2. <Node, List<ProducerBatch>> 转换为 <Node, Request> 形式, 然后把Request发往各个Node
  • InFlightRequests
    • Sender线程发往Kafka之前会将消息保存到InFlightRequests中, 主要存储格式是Map<NodeId, Deque>, 主要缓存了已经发出去但是还没有收到响应的请求. 这个Deque的配置参数是max.in.flight.requests. per. connection(默认为5). 通过比较配置参数和实际Deque的大小来判断是否还能再向这个连接发送请求, 除非缓存的请求收到响应从队列移除
    • 通过InFlightRequests可以得到负载最低的Node, 叫做leastLoadedNode, 因为未确认的请求越多说明负载越高

以下公众号是我个人运营的, 初衷是可以适当备份自己的笔记, 之后想可能会帮到更多的人. 发文时间不固定, 尽可能每篇文章直奔主题, 不去占用大家宝贵的时间. 如觉得对你有用, 欢迎关注交流.


技术图片

以上是关于Kafka生产者的主要内容,如果未能解决你的问题,请参考以下文章

kafka源码分析 生产消息过程

kafka生产者的发送消息的流程以及代码案例

大数据(6f)图解Kafka生产者和消费者API

spring kafka中是不是有多个生产者的代码示例?

如何保证kafka生产者发送消息的可靠性

Java API中kafka生产者发送消息没有成功