kafka生产者源码分析
Posted java架构师-太阳
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka生产者源码分析相关的知识,希望对你有一定的参考价值。
创建KafkaProducer对象,进入构造方法
KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer, ProducerMetadata metadata, KafkaClient kafkaClient, ProducerInterceptors<K, V> interceptors, Time time)
try
this.producerConfig = config; // 用户定义的配置
this.time = time; // 当前时间
String transactionalId = config.getString("transactional.id"); // 用户定义的事务id
this.clientId = config.getString("client.id"); // 用户定义的的clientId
LogContext logContext; // 记录日志
if (transactionalId == null)
logContext = new LogContext(String.format("[Producer clientId=%s] ", this.clientId));
else
logContext = new LogContext(String.format("[Producer clientId=%s, transactionalId=%s] ", this.clientId, transactionalId));
this.log = logContext.logger(KafkaProducer.class);
this.log.trace("Starting the Kafka producer");
// clientId的监控map
Map<String, String> metricTags = Collections.singletonMap("client-id", this.clientId);
// 监控配置,包括样本量,取样窗口时间,记录级别
MetricConfig metricConfig = (new MetricConfig()).samples(config.getInt("metrics.num.samples")).timeWindow(config.getLong("metrics.sample.window.ms"), TimeUnit.MILLISECONDS).recordLevel(RecordingLevel.forName(config.getString("metrics.recording.level"))).tags(metricTags);
// 监控数据上报类
List<MetricsReporter> reporters = config.getConfiguredInstances("metric.reporters", MetricsReporter.class, Collections.singletonMap("client.id", this.clientId));
JmxReporter jmxReporter = new JmxReporter();
jmxReporter.configure(config.originals(Collections.singletonMap("client.id", this.clientId)));
reporters.add(jmxReporter);
MetricsContext metricsContext = new KafkaMetricsContext("kafka.producer", config.originalsWithPrefix("metrics.context."));
this.metrics = new Metrics(metricConfig, reporters, time, metricsContext);
// 获取分区器
this.partitioner = (Partitioner)config.getConfiguredInstance("partitioner.class", Partitioner.class, Collections.singletonMap("client.id", this.clientId));
long retryBackoffMs = config.getLong("retry.backoff.ms");
// key序列化器
if (keySerializer == null)
this.keySerializer = (Serializer)config.getConfiguredInstance("key.serializer", Serializer.class);
this.keySerializer.configure(config.originals(Collections.singletonMap("client.id", this.clientId)), true);
else
config.ignore("key.serializer");
this.keySerializer = keySerializer;
// value序列化器
if (valueSerializer == null)
this.valueSerializer = (Serializer)config.getConfiguredInstance("value.serializer", Serializer.class);
this.valueSerializer.configure(config.originals(Collections.singletonMap("client.id", this.clientId)), false);
else
config.ignore("value.serializer");
this.valueSerializer = valueSerializer;
// 拦截器
List<ProducerInterceptor<K, V>> interceptorList = config.getConfiguredInstances("interceptor.classes", ProducerInterceptor.class, Collections.singletonMap("client.id", this.clientId));
if (interceptors != null)
this.interceptors = interceptors;
else
this.interceptors = new ProducerInterceptors(interceptorList); // 拦截器是空的封装一下
ClusterResourceListeners clusterResourceListeners = this.configureClusterResourceListeners(keySerializer, valueSerializer, interceptorList, reporters);
// 一个请求大小,字节
this.maxRequestSize = config.getInt("max.request.size");
// 缓存总大小
this.totalMemorySize = config.getLong("buffer.memory");
// 压缩类型
this.compressionType = CompressionType.forName(config.getString("compression.type"));
this.maxBlockTimeMs = config.getLong("max.block.ms");
int deliveryTimeoutMs = configureDeliveryTimeout(config, this.log);
this.apiVersions = new ApiVersions();
// 事务管理器
this.transactionManager = this.configureTransactionState(config, logContext);
// 用户发送消息的收集器
this.accumulator = new RecordAccumulator(logContext, config.getInt("batch.size"), this.compressionType, lingerMs(config), retryBackoffMs, deliveryTimeoutMs, this.metrics, "producer-metrics", time, this.apiVersions, this.transactionManager, new BufferPool(this.totalMemorySize, config.getInt("batch.size"), this.metrics, time, "producer-metrics"));
List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList("bootstrap.servers"), config.getString("client.dns.lookup"));
if (metadata != null) // 元数据
this.metadata = metadata;
else
// 每隔一段时间都要更新集群的元数据,默认是5分钟
this.metadata = new ProducerMetadata(retryBackoffMs, config.getLong("metadata.max.age.ms"), config.getLong("metadata.max.idle.ms"), logContext, clusterResourceListeners, Time.SYSTEM);
this.metadata.bootstrap(addresses);
this.errors = this.metrics.sensor("errors");
// 发送者,调用NetworkClient的方法进行消息发送,是tcpq请求
this.sender = this.newSender(logContext, kafkaClient, this.metadata);
String ioThreadName = "kafka-producer-network-thread | " + this.clientId;
// 设置线程,参数一是参数名,参数二是serder,参数三设置为守护线程
this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
// 启动发送消息线程
this.ioThread.start();
config.logUnused();
AppInfoParser.registerAppInfo("kafka.producer", this.clientId, this.metrics, time.milliseconds());
this.log.debug("Kafka producer started");
catch (Throwable var22)
this.close(Duration.ofMillis(0L), true);
throw new KafkaException("Failed to construct kafka producer", var22);
发送消息
无论是同步发送还是异步发送,都进入这个方法
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback)
// 调用拦截器
ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
// 发送消息
return this.doSend(interceptedRecord, callback);
进入拦截器onSend()
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record)
// 消息
ProducerRecord<K, V> interceptRecord = record;
// 遍历拦截器
Iterator var3 = this.interceptors.iterator();
while(var3.hasNext())
ProducerInterceptor interceptor = (ProducerInterceptor)var3.next();
try
// 给消息执行拦截
interceptRecord = interceptor.onSend(interceptRecord);
catch (Exception var6)
if (record != null)
log.warn("Error executing interceptor onSend callback for topic: , partition: ", new Object[]record.topic(), record.partition(), var6);
else
log.warn("Error executing interceptor onSend callback", var6);
return interceptRecord;
进入return this.doSend(interceptedRecord,acllback)
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback)
TopicPartition tp = null; // 创建主题分区
try
this.throwIfProducerClosed();
long nowMs = this.time.milliseconds();
KafkaProducer.ClusterAndWaitTime clusterAndWaitTime;
try
// 获取topic的元数据,确保topic的元数据可用
clusterAndWaitTime = this.waitOnMetadata(record.topic(), record.partition(), nowMs, this.maxBlockTimeMs);
catch (KafkaException var22)
if (this.metadata.isClosed())
throw new KafkaException("Producer closed while send in progress", var22);
throw var22;
nowMs += clusterAndWaitTime.waitedOnMetadataMs;
long remainingWaitMs = Math.max(0L, this.maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
Cluster cluster = clusterAndWaitTime.cluster;
byte[] serializedKey;
try
// key序列化
serializedKey = this.keySerializer.serialize(record.topic(), record.headers(), record.key());
catch (ClassCastException var21)
throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() + " to class " + this.producerConfig.getClass("key.serializer").getName() + " specified in key.serializer", var21);
byte[] serializedValue;
try
// value序列化
serializedValue = this.valueSerializer.serialize(record.topic(), record.headers(), record.value());
catch (ClassCastException var20)
throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() + " to class " + this.producerConfig.getClass("value.serializer").getName() + " specified in value.serializer", var20);
// 调用分区器,分区的分配
int partition = this.partition(record, serializedKey, serializedValue, cluster);
// 将消息发送到哪个主题哪个分区
tp = new TopicPartition(record.topic(), partition);
// 设置只读
this.setReadOnly(record.headers());
// 设置头消息
Header[] headers = record.headers().toArray();
// 计算序列化后的大小
int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(this.apiVersions.maxUsableProduceMagic(), this.compressionType, serializedKey, serializedValue, headers);
this.ensureValidRecordSize(serializedSize);
long timestamp = record.timestamp() == null ? nowMs : record.timestamp();
if (this.log.isTraceEnabled())
this.log.trace("Attempting to append record with callback to topic partition ", new Object[]record, callback, record.topic(), partition);
// 消息从broker确认后,要调用的拦截器
Callback interceptCallback = new KafkaProducer.InterceptorCallback(callback, this.interceptors, tp);
if (this.transactionManager != null && this.transactionManager.isTransactional())
this.transactionManager.failIfNotReadyForSend();
// 将消息追加到消息累加器
RecordAppendResult result = this.accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs, true, nowMs);
if (result.abortForNewBatch)
int prevPartition = partition;
this.partitioner.onNewBatch(record.topic(), cluster, partition);
partition = this.partition(record, serializedKey, serializedValue, cluster);
tp = new TopicPartition(record.topic(), partition);
if (this.log.isTraceEnabled())
this.log.trace("Retrying append due to new batch creation for topic partition . The old partition was ", new Object[]record.topic(), partition, prevPartition);
interceptCallback = new KafkaProducer.InterceptorCallback(callback, this.interceptors, tp);
result = this.accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs, false, nowMs);
// 如果是事务消息,设置事务管理器
if (this.transactionManager != null && this.transactionManager.isTransactional())
this.transactionManager.maybeAddPartitionToTransaction(tp);
// 如果消息累加器中对应的分区中够一个批次,则唤醒发送线程
// 如果linger.ms时间到了,则需要创建新的批次,则需要唤醒发送线程
if (result.batchIsFull || result.newBatchCreated)
this.log.trace("Waking up the sender since topic partition is either full or getting a new batch", record.topic(), partition);
this.sender.wakeup();
// 返回future对象
return result.future;
catch (ApiException var23)
this.log.debug("Exception occurred during message send:", var23);
if (callback != null)
callback.onCompletion((RecordMetadata)null, var23);
this.errors.record();
this.interceptors.onSendError(record, tp, var23);
return new KafkaProducer.FutureFailure(var23);
catch (InterruptedException var24)
this.errors.record();
this.interceptors.onSendError(record, tp, var24);
throw new InterruptException(var24);
catch (KafkaException var25)
this.errors.record();
this.interceptors.onSendError(record, tp, var25);
throw var25;
catch (Exception var26)
this.interceptors.onSendError(record, tp, var26);
throw var26;
- Producer 通过 waitOnMetadata() 方法来获取对应 topic 的 metadata 信息,需要先该topic 是可用的
- Producer 端对 record 的 key 和 value 值进行序列化操作,在 Consumer 端再进行相应的反序列化
- 获取partition值,具体分为下面三种情况:
-
指明 partition 的情况下,直接将指明的值直接作为 partiton 值
-
没有指明 partition 值但有 key 的情况下,将 key 的 hash 值与 topic 的 partition 数进行取余得到 partition 值
-
既没有 partition 值又没有 key 值的情况下,第一次调用时随机生成一个整数(后面每次调用在这个整数上自增),将这个值与 topic 可用的 partition 总数取余得到partition 值,也就是常说的 round-robin 算法
-
Producer 默认使用的 partitioner 是org.apache.kafka.clients.producer.internals.DefaultPartitioner
-
- 向 accumulator 写数据,先将 record 写入到 buffer 中,当达到一个 batch.size 的大小时,再唤起 sender线程去发送 RecordBatch,这里仔细分析一下Producer是如何向buffer写入数据的
- 获取该 topic-partition 对应的 queue,没有的话会创建一个空的 queue
- 向queue追加数据,先获取queue最新加入的那个RecordBatch,如果不存在或者存在但剩余空余不足以添加本条 record 则返回 null,成功写入的话直接返回结果,写入成功
- 创建一个新的 RecordBatch,初始化内存大小根据 max(batch.size,Records.LOG_OVERHEAD + Record.recordSize(key, value)) 来确定(防止单条record 过大的情况)
- 向新建的 RecordBatch 写入 record,并将 RecordBatch 添加到 queue 中,返回结果,写入成功
- 发送 RecordBatch,当 record 写入成功后,如果发现 RecordBatch 已满足发送的条件(通常是 queue 中有多个 batch,那么最先添加的那些 batch 肯定是可以发送了),那么就会唤醒sender 线程,发送 RecordBatch 。sender 线程对 RecordBatch 的处理是在 run() 方法中进行的,该方法具体实现如下:
- 获取那些已经可以发送的 RecordBatch 对应的 nodes
- 如果与node 没有连接(如果可以连接,同时初始化该连接),就证明该 node 暂时不能发送数据,暂时移除该 node
- 返回该 node 对应的所有可以发送的 RecordBatch 组成的 batches(key 是node.id),并将 RecordBatch 从对应的 queue 中移除
- 将由于元数据不可用而导致发送超时的 RecordBatch 移除
- 发送 RecordBatch
MetaData更新机制
- metadata.requestUpdate()将metadata的needUpdate变量设置为true(强制更新),并返回当前的版本号(version),通过版本号来判断metadata是否完成更新
- sender.wakeup() 唤醒 sender 线程,sender 线程又会去唤醒NetworkClient线程去更新
- metadata.awaitUpdate(version, remainingWaitMs) 等待 metadata 的更新
- 所以,每次 Producer 请求更新 metadata 时,会有以下几种情况:
- 如果 node 可以发送请求,则直接发送请求
- 如果该 node 正在建立连接,则直接返回
- 如果该 node 还没建立连接,则向 broker 初始化链接
- NetworkClient的poll方法中判断是否需要更新meta数据, handleCompletedReceives 处理metadata 的更新,最终是调用的 DefaultMetadataUpdater 中的handleCompletedMetadataResponse方法处理
以上是关于kafka生产者源码分析的主要内容,如果未能解决你的问题,请参考以下文章