kafka生产者网络层总结
Posted set-cookie
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka生产者网络层总结相关的知识,希望对你有一定的参考价值。
1 层次结构
负责进行网络IO请求的是NetworkClient,主要层次结构如下
ClusterConnectionStates报存了每个节点的状态,以node为key,以node的状态为value;inFlightRequets中保存了每个节点已经发送的请求,但是还没有返回的请求,以node为key,以List<ClientRequest>为value。inFlightRequets从名字也可以看出,表示“正在空中飞”的请求。
2 如何保证每次只发送一个请求
sender线程启动后,如果RecordBatch中有消息,会将消息按照所在节点重新排列,每个节点会创建一个ClientRequest,用来发送,每个节点每次只能发送一个ClientRequest,如下
KafkaChannel#setSend(..)
public void setSend(Send send) {
if (this.send != null) // 如果已经有send,会抛出异常
throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress.");
this.send = send;
this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);
}
那么kafka是如何保证避免setSend的时候KafkaChannel中已经有send了呢,这个关键就是在sender线程中会调用NetworkClient#ready(..),会将没有ready的节点去除掉,从而不会在该节点上setSend:
while (iter.hasNext()) {
Node node = iter.next();
if (!this.client.ready(node, now)) { // 关键
iter.remove();
notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));
}
}
3 NetworkClient#ready(..)
NetworkClient#ready(..)检查节点是否准备好,从而决定是否可以将消息封装成ClientRequest放到KafkaChannel上。
public boolean ready(Node node, long now) {
if (node.isEmpty())
throw new IllegalArgumentException("Cannot connect to empty node " + node);
if (isReady(node, now)) // 关键
return true;
if (connectionStates.canConnect(node.idString(), now))
initiateConnect(node, now);
return false;
}
我们来分析下isReady
public boolean isReady(Node node, long now) {
return !metadataUpdater.isUpdateDue(now) && canSendRequest(node.idString());
}
isReady主要两个条件,一个是判断metadata是否到了更新的时候了,如果metadata需要更新,那么就不发送本次请求,也就是metadata更新优先级高。第二个是判断这个节点是否canSendRequest。
private boolean canSendRequest(String node) {
return connectionStates.isConnected(node) && selector.isChannelReady(node)
&& inFlightRequests.canSendMore(node); // 重点
}
inFlightRequests报保存的是“正在空中飞”的请求
public boolean canSendMore(String node) {
Deque<ClientRequest> queue = requests.get(node);
return queue == null || queue.isEmpty() ||
(queue.peekFirst().request().completed() && queue.size() < this.maxInFlightRequestsPerConnection);
}
满足以下几个条件,表示可以继续send
- queue是空,即该节点没有“正在空中飞”的request
- queue不为空。queue中排在最开头的request已经completed 并且queue的大小小于允许的最大值。如何理解呢?queue是一个双端队列,每次set的时候都会在queue的头部插入,所以queue中第一个就是正在发送的,或者说是KafkaChannel中的send。只要当send发送到网络中的时候才可以继续发送。这就保证了前面说的“如何保证每次只发送一个请求”。
4 参考
以上是关于kafka生产者网络层总结的主要内容,如果未能解决你的问题,请参考以下文章
Kafka从成神到升仙系列 五面试官问我 Kafka 生产者的网络架构,我直接开始从源码背起.......
kafka学习总结007 --- 生产者Java API实例
SpringCloud系列十一:SpringCloudStream(SpringCloudStream 简介创建消息生产者创建消息消费者自定义消息通道分组与持久化设置 RoutingKey)(代码片段