吃透Kafka四:kafka producer源码详细分析
Posted 吃透Java
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了吃透Kafka四:kafka producer源码详细分析相关的知识,希望对你有一定的参考价值。
一,kafka producer基本使用
kafka生产则代码如下:
public static void main(String[] args) throws ExecutionException, InterruptedException
Properties conf = new Properties();
conf.setProperty(ProducerConfig.ACKS_CONFIG, "0");
conf.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
conf.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
conf.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
conf.setProperty(ProducerConfig.PARTITIONER_CLASS_CONFIG, DefaultPartitioner.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(conf);
ProducerRecord<String, String> msg = new ProducerRecord<String, String>("topic", "key", "value");
Future<RecordMetadata> future = producer.send(msg);
RecordMetadata recordMetadata = future.get();
// Future<RecordMetadata> send = producer.send(msg, new Callback()
// @Override
// public void onCompletion(RecordMetadata metadata, Exception exception)
//
//
// );
从上面的 API 可以得知,用户在使用 KafkaProducer 发送消息时,首先需要将待发送的消息封装成 ProducerRecord,返回的是一个 Future 对象,典型的 Future 设计模式。在发送时也可以指定一个 Callable 接口用来执行消息发送的回调。
我们看一下ProducerRecord构造函数:
public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers) ...
- topic:消息所属的主题。
- partition:可以直接指定msg发送到哪个分区,如果不指定partition而指定key,会使用key的hashCode与队列总数进行取模来选择分区,如果前面两者都未指定,则会轮询主题下的所有分区。
- timestamp:消息时间戳,根据 topic 的配置信息 message.timestamp.type 的值来赋予不同的值。CreateTime:发送客户端发送消息时的时间戳。LogAppendTime:消息在 broker 追加时的时间戳。
- key:消息键,如果指定该值,则会使用该值的 hashcode 与 队列数进行取模来选择分区。
- value:消息体。
- headers:该消息的额外属性对,与消息体分开存储,是一系列的 key-value 键值对。
二,kafka send源码分析
KafkaProducer 的 send 方法,并不会直接向 broker 发送消息,kafka 将消息发送异步化,即分解成两个步骤:
- send 方法的职责是将消息追加到内存中(分区的缓存队列中)。
- 由专门的 Send 线程异步将缓存中的消息批量发送到 Kafka Broker 中。
send方法
public class KafkaProducer<K, V> implements Producer<K, V>
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback)
TopicPartition tp = null;
try
// 获取 topic 的分区列表,如果本地没有该topic的分区信息,则需要向远端 broker 获取,
// 该方法会返回拉取元数据所耗费的时间。在消息发送时的最大等待时间时会扣除该部分损耗的时间。
ClusterAndWaitTime clusterAndWaitTime;
try
clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), nowMs, maxBlockTimeMs);
catch (KafkaException e)
if (metadata.isClosed())
throw new KafkaException("Producer closed while send in progress", e);
throw e;
Cluster cluster = clusterAndWaitTime.cluster;
// key value序列化,用于后面网络发送.
// 注意:参与序列化的只有key value
byte[] serializedKey;
try
serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
byte[] serializedValue;
try
serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
// 根据分区负载算法计算本次消息发送该发往的分区
int partition = partition(record, serializedKey, serializedValue, cluster);
tp = new TopicPartition(record.topic(), partition);
// 根据使用的版本号,按照消息协议来计算消息的长度,并是否超过指定长度,如果超过则抛出异常。
int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),
compressionType, serializedKey, serializedValue, headers);
ensureValidRecordSize(serializedSize);
// 对传入的 Callable(回调函数) 加入到拦截器链中
Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);
// 如果事务处理器不为空,执行事务管理相关的
if (transactionManager != null && transactionManager.isTransactional())
transactionManager.failIfNotReadyForSend();
// 将消息追加到缓存区,如果当前缓存区已写满或创建了一个新的缓存区,则唤醒 Sender(消息发送线程),
// 将缓存区中的消息发送到 broker 服务器,最终返回 future。
// 从这里也能得知,doSend 方法执行完成后,此时消息还不一定成功发送到 broker。
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
serializedValue, headers, interceptCallback, remainingWaitMs, true, nowMs);
// 由于目标partition的当前batch没有空间了,需要更换一个partition,再次尝试
if (result.abortForNewBatch)
int prevPartition = partition;
// 更换目标partition,同时也会更换StickyPartitionCache黏住的partition
partitioner.onNewBatch(record.topic(), cluster, prevPartition);
// 计算新的目标partition
partition = partition(record, serializedKey, serializedValue, cluster);
tp = new TopicPartition(record.topic(), partition);
interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);
// 再次调用append()方法向RecordAccumulator写入message,如果该partition缓冲区中的batch也没有空间,
// 则创建新batch了,不会再次尝试了
result = accumulator.append(tp, timestamp, serializedKey,
serializedValue, headers, interceptCallback, remainingWaitMs, false, nowMs);
if (result.batchIsFull || result.newBatchCreated)
this.sender.wakeup();
return result.future;
以上send核心方法主要分为如下几步:
- 获取 topic 的分区列表,如果本地没有该topic的分区信息,则需要向远端 broker 获取。
- 根据分区负载算法计算本次消息发送该发往的分区。
- 将消息追加到缓存区,如果当前缓存区已写满或创建了一个新的缓存区,则唤醒 Sender(消息发送线程),将缓存区中的消息发送到 broker 服务器,最终返回 future。
三,waitOnMetadata获取元数据
在我们通过 KafkaProducer 发送 message 的时候,我们只明确指定了 message 要写入哪个 topic ,并没有明确指定要写入的 partition。
但是同一个 topic 的 partition 可能位于 kafka 的不同 broker 上,所以 producer 需要明确的知道该 topic 下所有 partition 的元信息(即所在 broker 的 IP、端口等信息),这样才能与 partition 所在 broker 建立网络连接并发送 message。
在 KafkaProducer 中,使用 Node、TopicPartition、PartitionInfo 三个类来记录 Kafka 集群元数据:
- Node 表示 kafka 集群中的一个节点,其中维护了节点的 host、ip、port 等基础信息。
- TopicPartition 用来抽象一个 topic 中的的一个 partition,其中维护 topic 的名称以及 partition 的编号信息。
- PartitionInfo 用来抽象一个 partition 的信息,其中:leader 字段记录了 leader replica 所在节点的 id;replica 字段记录了全部 replica 所在的节点信息;inSyncReplicas 字段记录了 ISR 集合中所有 replica 所在的节点信息。
kafka producer 会将上述三个维度的基础信息封装成 Cluster 对象使用,下面是 Cluster 包含的信息:
public final class Cluster
// 标识当前元数据信息是初始化的配置还是启动之后的
private final boolean isBootstrapConfigured;
// kafka集群中全部Node的集合
private final List<Node> nodes;
// topic信息,这里根据topic属性进行分类
private final Set<String> unauthorizedTopics;
private final Set<String> invalidTopics;
private final Set<String> internalTopics;
// kafka集群中controller所在的节点
private final Node controller;
// 可以根据TopicPartition来查询该partition的具体信息
private final Map<TopicPartition, PartitionInfo> partitionsByTopicPartition;
// 根据topic来查询其下partition具体信息数组
private final Map<String, List<PartitionInfo>> partitionsByTopic;
private final Map<String, List<PartitionInfo>> availablePartitionsByTopic;
// 根据nodeId来查询落到其上的partition具体信息数组
private final Map<Integer, List<PartitionInfo>> partitionsByNode;
// 根据nodeId来查询Node对象
private final Map<Integer, Node> nodesById;
// 集群的唯一标识
private final ClusterResource clusterResource;
// 维护了topic名称和唯一标识
private final Map<String, Uuid> topicIds;
再向上一层,Cluster 对象会被维护到 Metadata 中,Metadata 同时还维护了 Cluster 的版本号、过期时间、监听器等等信息,如下图所示:
public class Metadata implements Closeable
// 两次更新元数据最小时间差,默认是100ms,这是为了防止更新就操作过于频繁而造成网络阻塞和服务端压力
private final long refreshBackoffMs;
// 元数据失效时间,需要更新元数据的时间间隔,默认5分钟
private final long metadataExpireMs;
...
// 原数据缓存,更新的元数据都在MetadataCache中村粗
private MetadataCache cache = MetadataCache.empty();
...
接下来,我们来看 KafkaProducer.waitOnMetadata()方法是如何工作的:
public class KafkaProducer<K, V> implements Producer<K, V>
private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long nowMs, long maxWaitMs) throws InterruptedException
// 先去本地获取cluster信息,如果是第一次获取集群信息,那么应该只有我们本地配置的cluster node信息。
Cluster cluster = metadata.fetch();
// 省略部分源码...
do
metadata.add(topic, nowMs + elapsed);
int version = metadata.requestUpdateForTopic(topic);
// 唤醒Sender线程,由Sender线程去完成元数据的更新
sender.wakeup();
// 阻塞等待元数据更新,停止阻塞的条件是:更新后的updateVersion大于当前version,超时的话会直接抛出异常
try
metadata.awaitUpdate(version, remainingWaitMs);
// 获取更新最新的cluster信息
cluster = metadata.fetch();
elapsed = time.milliseconds() - nowMs;
// 获取当前topic下的分区数量
partitionsCount = cluster.partitionCountForTopic(topic);
while (partitionsCount == null || (partition != null && partition >= partitionsCount));
return new ClusterAndWaitTime(cluster, elapsed);
以上代码主要完成对sender线程的唤醒,阻塞等待sender线程去broker拉取最新的cluster元数据信息,并返回。
由于Sender线程内容太多,我们放到后面来章节来详细分析。
此时我们应该知道,发送msg之前我们要先获取cluster集群的元数据信息,然后再根据TopicPartition来获取具体要发送到哪个broker上,根据目标broker的ip,port来建立socket链接,然后发送msg。
四,partition 选择
在 waitOnMetadata() 方法拿到最新的集群元数据之后,下面就要开始确定待发送的 message 要发送到哪个 partition 了。
如果我们明确指定了目标 partition,则以用户指定的为准,但是一般情况下,业务并不会指定 message 需要写入到哪个 partition,此时就会通过 Partitioner 接口结合元数据计算出一个目标 partition。
public interface Partitioner extends Configurable, Closeable
// 根据传参获取一个分区
int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);
void close();
default void onNewBatch(String topic, Cluster cluster, int prevPartition)
Partitioner接口实现类可以通过KafkaProducer配置项partitioner.class指定,默认值为DefaultPartitioner,其主要功能为:
- 如果 message 存在的 key 的话,则取 key 的 hash 值(使用的是 murmur2 这种高效率低碰撞的 Hash 算法),然后与 partition 总数取模,得到目标 partition 编号,这样可以保证同一 key 的 message 进入同一 partition。
- 如果 message 没有 key,则通过 StickyPartitionCache.partition() 方法计算目标 partition。
public class DefaultPartitioner implements Partitioner
private final StickyPartitionCache stickyPartitionCache = new StickyPartitionCache();
public void configure(Map<String, ?> configs)
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster)
return partition(topic, key, keyBytes, value, valueBytes, cluster, cluster.partitionsForTopic(topic).size());
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster,int numPartitions)
if (keyBytes == null)
return stickyPartitionCache.partition(topic, cluster);
// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
public void close()
public void onNewBatch(String topic, Cluster cluster, int prevPartition)
stickyPartitionCache.nextPartition(topic, cluster, prevPartition);
这里解释一下 StickyPartitionCache 的功能,RecordAccumulator 是一个缓冲区,主线程发送的 message 会先进入 RecordAccumulator,然后 Sender 线程攒够了 message 的时候进行批量发送。
触发 Sender 线程批量发送堆积 message 的条件主要有两方面:
- message 的延迟时间到了,也就是说,我们的业务场景对 message 发送有延迟要求,message 不能一直在 producer 端缓存。我们可以通过 linger.ms 配置降低 message 的发送延迟。
- message 堆积的足够多,达到了一定阈值,才适合批量发送,这样有效负载较高。批量发送的 batch.size 默认值是 16KB。
StickyPartitionCache 主要实现的是"黏性选择",就是尽可能的先往一个 partition 发送 message,让发往这个 partition 的缓冲区快速填满,这样的话,就可以降低 message 的发送延迟。我们不用担心出现 partition 数据量不均衡的情况,因为只要业务运行时间足够长,message 还是会均匀的发送到每个 partition 上的。
下面来看 StickyPartitionCache 的实现,其中维护了一个 ConcurrentMap(indexCache 字段),key 是 topic,value 是当前黏住了哪个 partition。
public class StickyPartitionCache
private final ConcurrentMap<String, Integer> indexCache;
public StickyPartitionCache()
this.indexCache = new ConcurrentHashMap<>();
public int partition(String topic, Cluster cluster)
// 先从 indexCache 字段中获取黏住的 partition
Integer part = indexCache.get(topic);
// 如果没有,则调用 nextPartition() 方法向 indexCache 中写入一个.
if (part == null)
return nextPartition(topic, cluster, -1);
return part;
// 在 nextPartition() 方法中,会先获取目标 topic 中可用的 partition,并从中随机选择一个写入 indexCache
public int nextPartition(String topic, Cluster cluster, int prevPartition)
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
Integer oldPart = indexCache.get(topic);
Integer newPart = oldPart;
if (oldPart == null || oldPart == prevPartition)
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() < 1)
Integer random = Utils.toPositive(ThreadLocalRandom.current().nextInt());
newPart = random % partitions.size();
else if (availablePartitions.size() == 1)
newPart = availablePartitions.get(0).partition();
else
while (newPart == null || newPart.equals(oldPart))
int random = Utils.toPositive(ThreadLocalRandom.current(以上是关于吃透Kafka四:kafka producer源码详细分析的主要内容,如果未能解决你的问题,请参考以下文章
吃透Kafka五:Producer Metadata 更新机制