Kafka生产者消息发送流程
Posted 不识君的荒漠
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kafka生产者消息发送流程相关的知识,希望对你有一定的参考价值。
消息发送示例
使用kafka生产者发送一条消息的时候,示例代码,可能如下:
public static void main(String[] args) throws InterruptedException
String server = "localhost:9092";
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 安全认证配置,如果需要会这样配置,未启用ACL,忽略这几项配置
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
props.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-256");
props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.scram.ScramLoginModule required username=\\"admin\\" password=\\"admin\\";");
KafkaProducer<Integer, String> producer = new KafkaProducer<>(props);
String topic = "test_topic";
String message = "hello, kafka";
producer.send(new ProducerRecord<>(topic, message), (metadata, exception) ->
log.info("metadata: ", metadata);
if (exception != null)
log.error("send exception: ", exception);
);
Thread.currentThread().join();
示例代码,仅供参考,下面分析一下消息是如何发送出去的。
发送流程分析
鉴于代码量太大,所以不会贴太多源码分析,尽量使用流程图和文字表达清楚。
基本流程
示例代码中,消息发送整体分两个步骤:
- 构造生产者实例
- 消息发送
下面对这两个过程分析
创建生产者
创建流程
示例中,创建的关键代码就这一行 :
KafkaProducer<Integer, String> producer = new KafkaProducer<>(props);
主要流程如流程图
- 调用KafkaProducer接口构造生产者实例
- 将配置的Properties转换为ProducerConfig,kafka生产者的配置属性有很多,如果我们没有配置的属性就会使用默认配置
- 初始化生产者实例的各个属性,如上面注释中图片里显示的这些属性
- 启动消息发送线程,完成
这里面有两个属性需要本文重点关注,与消息发送有直接关系(比如监控统计类属性,就不算直接关系,因为即使没有,消息也可以发送,就这个意思):
这两个属性:
// 缓存待发送的消息
this.accumulator = new RecordAccumulator(logContext,
config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),
this.compressionType,
lingerMs(config),
retryBackoffMs,
deliveryTimeoutMs,
metrics,
PRODUCER_METRIC_GROUP_NAME,
time,
apiVersions,
transactionManager,
new BufferPool(this.totalMemorySize, config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), metrics, time, PRODUCER_METRIC_GROUP_NAME));
// 进行实际的消息发送运作
this.sender = newSender(logContext, kafkaClient, this.metadata);
消息发送
生产者-消费者模式发送消息
上面提到了producer有两个属性:accumulator和sender。
kafka producer发送消息并不是同步发送的,而类似于生产者-消费者模式的异步消息发送:
当produer调用send方法,发送消息的时候,只是先把消息缓存到一个队列,由该模式的消费者(另一个线程)来消费消息并执行真正的发送逻辑。这样主要是为了发送的时候尽量是批次的消息发送,而非单条单条消息的发送用来提升发送性能。
prdoucer实例就相当于该模式的生产者,accumulator是其中的缓存队列,sender便是消费者。
sender是在一个异步线程(ioThread)中执行主要逻辑,不停的从accumulator中获取准备发送的消息批次并通过网络发送到目标broker上,基本流程如下:
所以,到这里,也是主要分两步:
- producer调用send方法把消息放入accumulator
- sender从accumulator拿到消息发送到kafka broker
消息放入accumulator
我们发送消息可能这样写:
producer.send(new ProducerRecord<>(topic, message), (metadata, exception) ->
log.info("metadata: ", metadata);
if (exception != null)
log.error("send exception: ", exception);
);
看一下调用send方法后的执行流程:
消息实际是批量发送,从流程图里可以看到一个批次的消息满足条件才会唤醒sender准备发送,接下来看一下sender实际进行消息发送的流程。
消息批量异步发送
上文说明sender是在调用 new KafkaProducer()构造producer实例的时候,初始化的,代码如下:
this.sender = newSender(logContext, kafkaClient, this.metadata);
String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;
this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
this.ioThread.start();
是在一个异步线程(ioThread)内执行相关逻辑,是一个死循环:
@Override
public void run()
log.debug("Starting Kafka producer I/O thread.");
// main loop, runs until close is called
while (running)
try
// 调用Sender.runOnce()方法
runOnce();
catch (Exception e)
log.error("Uncaught error in kafka producer I/O thread: ", e);
log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records.");
// 下面执行关闭逻辑,代码忽略
在Sender.runOnce方法里不停的调用发送生产消息的请求:
long currentTimeMs = time.milliseconds();
long pollTimeout = sendProducerData(currentTimeMs);
client.poll(pollTimeout, currentTimeMs);
看这两行代码,是否会奇怪调用sendProducerData方法后,又调用下面这个client.poll方法?
我简单解释下:
- kafka的网络通信框架是自己基于java nio封装的一套实现,不是像rocketmq等用的netty那一套
- 调用sendProducerData方法只是把要发送的请求准备好,还未进行实际的网络传输
- 调用client.poll方法会进行实际io操作,将所有channel待发送数据通过socket发送出去
基本流程大概是这样:
结语
消息的发送过程基本就是这样,主要采用的就是生产者-消费者模式,异步批量发送的形式。
其中处理过程中还是有特别多的细节,不再全部展开来说了。
示例代码中,配置了ACL认证,kafka的认证机制是在连接连接建立的时候做的。如果是默认的话,就是没有认证且明文传输,关键的握手那一块是空实现,这个会放在其实篇单独说明。它不像rocketmq那样,rocketmq是请求的时候每次都会带上相关秘钥进行权限认证。
以上是关于Kafka生产者消息发送流程的主要内容,如果未能解决你的问题,请参考以下文章