Kafkakafka NetworkClient
Posted 九师兄
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kafkakafka NetworkClient相关的知识,希望对你有一定的参考价值。
1.概述
转载:《kafka producer学习笔记9》-NetworkClient
一 InFlightRequests
上一篇我们整理了《kafka producer学习笔记8》-NIO,本篇其实对于producer而言,所剩不多了,主要就是networkclient了。有必要先补充下InFlightRequests,InFlightRequests队列的作用是缓存已经发出去但没有收到响应的ClientRequest。其底层是通过一个Map<String,Deque>对象实现,key是NodeId,value是发送到对应Node的ClientRequest对象集合。
/**
* The set of requests which have been sent or are being sent but haven't yet received a response
* 缓存了已经被发送或正在被发送但是均未接收到响应的客户端请求集合的一个封装
*/
final class InFlightRequests
//每个连接最大执行中请求数
private final int maxInFlightRequestsPerConnection;
//节点node至客户端请求双端队列Deque<ClientRequest>的映射集合,key为节点地址,Value为请求队列
private final Map<String, Deque<NetworkClient.InFlightRequest>> requests = new HashMap<>();
/** Thread safe total number of in flight requests. */
private final AtomicInteger inFlightRequestCount = new AtomicInteger(0);
...
当有新请求需要处理时,会在队首入列,而实际被处理的请求,则是从队尾出列,保证入列早的请求先得到处理。由于sendmore的条件限制,使用队列虽然可以存储多个请求,但是新的请求能加进来的条件是上一个请求必须已经发送成功。
先看下发送条件限制,NetworkClient调用这个方法是用于判断是否可以向指定Node发送请求的条件之一
InFlightRequests.java
/**
* Can we send more requests to this node?
*
* @param node Node in question
* @return true iff we have no requests still being sent to the given node
* 判断是否该连接能发送请求
*/
public boolean canSendMore(String node)
Deque<NetworkClient.InFlightRequest> queue = requests.get(node);
//判断条件:队列为空
return queue == null || queue.isEmpty() ||
//或者队列头请求已经发送完成且队列中没有堆积过多请求
(queue.peekFirst().send.completed() && queue.size() < this.maxInFlightRequestsPerConnection);
一开始是队列为空的判断条件为空就能发送,接着queue.peekFirst().request().completed():这个条件为true表示当前队头的请求已经发送完成,如果队头的请求迟迟发送不出去,可能是网络的原因,则不能继续向此Node发送请求。而且,队头的消息与对应KafkaChannel.send字段指向的事同一个消息,为了避免未发送的消息被覆盖,也不能让KafkaChannel.send字段指向新请求。queue.size() < this.maxInFlightRequestsPerConnection:为了判断InFlightRequests队列中是否堆积过多请求。如果Node已经堆积了很多未响应的请求,说明这个节点的负载或网络连接有问题,继续发送请求,则可能会超时。
再看下入队,入队是通过add()方法来完成的,代码如下:
InFlightRequests.java
/**
* Add the given request to the queue for the connection it was directed to
* 表示已经发送,或正在发送。并且还没有收到响应的(客户端)请求。请求首先加入到(目标节点对应的)队列头部
* 注意:由于sendmore()的限制,但是新的请求能加进来的条件是上一个请求必须已经发送成功!(这就避免了因为网络阻塞,请求一直堆积在某个节点上。)
*/
public void add(NetworkClient.InFlightRequest request)
//这个请求要发送到哪个Broker节点上
String destination = request.destination;
// 从requests集合中根据给定请求的目的地node获取Deque<ClientRequest>双端队列reqs
Deque<NetworkClient.InFlightRequest> reqs = this.requests.get(destination);
if (reqs == null) // 如果双端队列reqs为nul
// 构造一个双端队列ArrayDeque类型的reqs
reqs = new ArrayDeque<>();
// 将请求目的地node至reqs的对应关系添加到requests集合
this.requests.put(destination, reqs);
// reqs队列首部添加请求request,使用的是addFirst()方法加入队首
reqs.addFirst(request);
inFlightRequestCount.incrementAndGet();//计数增加
再看下出队,是有NetworkClient.handleCompletedReceives实现响应完成时调用,从inFlightRequests队列删除。
InFlightRequests.java
/**
* Get the oldest request (the one that will be completed next) for the given node
* 获取给定节点node的时间最久执行中请求,作为接下来要完成的请求
*/
public NetworkClient.InFlightRequest completeNext(String node)
// 根据给定节点node获取客户端请求双端队列reqs,并从poll出队尾元素
NetworkClient.InFlightRequest inFlightRequest = requestQueue(node).pollLast();
inFlightRequestCount.decrementAndGet();//计数器-1
return inFlightRequest;
对比下,入队add时是通过addFirst()方法添加到队首的,所以队尾的元素是时间最久的,也是应该先处理的,故出队应该用pollLast(),将存储时间最久的元素移出进行处理。另外注意一点:peekFirst与pollFirst区别。
/**
* Get the last request we sent to the given node (but don't remove it from the queue)
* @param node The node id
*(Deque是个双端队列,头尾操作方便)最后发送的请求
*/
public NetworkClient.InFlightRequest lastSent(String node)
return requestQueue(node).peekFirst();
/**
* Complete the last request that was sent to a particular node.
* 取出该连接,最新的请求
* @param node The node the request was sent to
* @return The request
*/
public NetworkClient.InFlightRequest completeLastSent(String node)
NetworkClient.InFlightRequest inFlightRequest = requestQueue(node).pollFirst();
inFlightRequestCount.decrementAndGet();
return inFlightRequest;
InFlightRequest表示正在发送的请求,它存储着请求发送前的所有的信息。另外,它还支持生成响应ClientResponse。当正常收到响应时,completed 方法会根据响应内容生成ClientResponse。当连接突然断开,disconnected方法会生成ClientResponse。
NetworkClient.InFlightRequest.java
static class InFlightRequest
// 请求头
final RequestHeader header;
//表示这个请求要发送到哪个Broker节点上
final String destination;
// 回调函数
final RequestCompletionHandler callback;
// 是否需要服务端返回响应
final boolean expectResponse;
// 请求体
final AbstractRequest request;
// 表示发送前是否需要验证连接状态
final boolean isInternalRequest; // used to flag requests which are initiated internally by NetworkClient
// 请求的序列化数据
final Send send;
// 发送时间
final long sendTimeMs;
// 请求的创建时间,这个是ClientRequest的创建时间
final long createdTimeMs;
//请求超时时间
final long requestTimeoutMs;
....
/**
* 收到响应,回调的时候据响应内容生成ClientResponse
*/
public ClientResponse completed(AbstractResponse response, long timeMs)
return new ClientResponse(header, callback, destination, createdTimeMs, timeMs,
false, null, null, response);
/**
* 当连接突然断开,disconnected方法会生成ClientResponse。
* @param timeMs
* @param authenticationException
* @return
*/
public ClientResponse disconnected(long timeMs, AuthenticationException authenticationException)
return new ClientResponse(header, callback, destination, createdTimeMs, timeMs,
true, null, authenticationException, null);
二 NetworkClient
NetworkClient依赖的组件都介绍完了,现在看下NetworkClient的实现。NetworkClient是个通用的网络客户端的实现,Kafka的所有消息,都是通过NetworkClient发送消息。无论是Kafka的生产者,还是Kakfa的消费者,都会包含NetworkClient。客户端和服务端的交互两种形式:发送请求和读取响应。
2.1 属性与方法
public class NetworkClient implements KafkaClient
private final Logger log;
/* the selector used to perform network i/o
* Kafka的Selector ,用来进行网络io操作
* */
private final Selectable selector;
private final MetadataUpdater metadataUpdater;
private final Random randOffset;
/* the state of each node's connection
* 保存与每个节点的连接状态
* */
private final ClusterConnectionStates connectionStates;
/* the set of requests currently being sent or awaiting a response */
//请求队列,保存正在发送但还没有收到响应的请求
private final InFlightRequests inFlightRequests;
/* the socket send buffer size in bytes */
private final int socketSendBuffer;
/* the socket receive size buffer in bytes */
private final int socketReceiveBuffer;
/* the client id used to identify this client in requests to the server */
private final String clientId;
/* the current correlation id to use when sending requests to servers */
private int correlation;
/* default timeout for individual requests to await acknowledgement from servers */
private final int defaultRequestTimeoutMs;
/* time in ms to wait before retrying to create connection to a server */
private final long reconnectBackoffMs;
private final ClientDnsLookup clientDnsLookup;
private final Time time;
/**
* True if we should send an ApiVersionRequest when first connecting to a broker.
* 是否需要与服务器的版本协调,默认都为true.第一次与broker连接需要
*/
private final boolean discoverBrokerVersions;
private final ApiVersions apiVersions;
/**
* 存储着要发送的版本请求,Key为主机地址,Value为构建请求的Builder
*/
private final Map<String, ApiVersionsRequest.Builder> nodesNeedingApiVersionsFetch = new HashMap<>();
...
NetworkClient 主要方法有
调用NetworkClient的ready方法,连接服务端
调用NetworkClient的poll方法,处理连接
调用NetworkClient的newClientRequest方法,创建请求ClientRequest
然后调用NetworkClient的send方法,发送请求
最后调用NetworkClient的poll方法,处理响应
networkclient流程:
2.2 ready()
NetworkClient发送请求之前,都需要先和服务端创建连接。如果没有ready,就会从readNodes里面移除,接下来就不会往这个Node发送消息。下面看看ready()
NetworkClient.java
public boolean ready(Node node, long now)
if (node.isEmpty())
throw new IllegalArgumentException("Cannot connect to empty node " + node);
//是否准备好发送请求
if (isReady(node, now))
return true;
//isReady()为false,会先初始化连接initiateConnect
if (connectionStates.canConnect(node.idString(), now))
// if we are interested in sending to a node and we don't have a connection to it, initiate one
initiateConnect(node, now);
return false;
/**
* Check if the node with the given id is ready to send more requests.
*
* @param node The node
* @param now The current time in ms
* @return true if the node is ready
*/
@Override
public boolean isReady(Node node, long now)
// if we need to update our metadata now declare all requests unready to make metadata requests first
// priority 当发现正在更新元数据时,会禁止发送请求&& 当连接没有创建完毕或者当前发送的请求过多时,也会禁止发送请求
return !metadataUpdater.isUpdateDue(now) && canSendRequest(node.idString(), now);
/**
* Are we connected and ready and able to send more requests to the given connection?
* 检测连接状态,检测发送请求是否过多
* @param node The node
* @param now the current timestamp
*/
private boolean canSendRequest(String node, long now)
return connectionStates.isReady(node, now) && selector.isChannelReady(node) &&
inFlightRequests.canSendMore(node);
/**
* Initiate a connection to the given node
* 创建连接到指定节点
*/
private void initiateConnect(Node node, long now)
String nodeConnectionId = node.idString();
try
// 更新连接状态为正在连接
this.connectionStates.connecting(nodeConnectionId, now, node.host(), clientDnsLookup);
InetAddress address = this.connectionStates.currentAddress(nodeConnectionId);
log.debug("Initiating connection to node using address ", node, address);
// 调用selector异步连接
selector.connect(nodeConnectionId,
new InetSocketAddress(address, node.port()),
this.socketSendBuffer,
this.socketReceiveBuffer);
catch (IOException e)
/* attempt failed, we'll try again after the backoff */
connectionStates.disconnected(nodeConnectionId, now);
/* maybe the problem is our metadata, update it */
metadataUpdater.requestUpdate();
log.warn("Error connecting to node ", node, e);
首先通过NetworkClient.isReady()方法检测是否能向一个Node发送请求,需要符合以下三个条件,表示Node已经准备好:
Metadata并未处于正在更新或需要更新的状态。
已经成功建立连接并连接正常connectionStates.isConnected(node)
InFlightRequests.canSendMore()返回true。
如果NetworkClient.isReady()返回false,且满足下面两个条件,就会调用initiateConnect()方法重新连接。
ClusterConnectionStates.java
/**
* Return true iff we can currently initiate a new connection. This will be the case if we are not
* connected and haven't been connected for at least the minimum reconnection backoff period.
* @param id the connection id to check
* @param now the current time in ms
* @return true if we can initiate a new connection
*/
public boolean canConnect(String id, long now)
NodeConnectionState state = nodeState.get(id);
if (state == null)
return true;
else
return state.state.isDisconnected() &&
now - state.lastConnectAttemptMs >= state.reconnectBackoffMs;
连接不能是CONNECTEDING状态,必须是DISCONNECTED。
为了避免网络拥塞,重连不能太频繁,两次重试之间的时间差必须大于重试的退避时间,由reconnectBackoffMs指定。
之前在看Selector部分的时候,一旦有读操作,就要读取一个完整的NetworkReceive。如果是写,可以分多次写。即读操作会在一次SelectionKey循环读取一个完整的接收动作,而写操作会在多次SelectionKey中完成一个完整的发送动作。这里ready相关,一个channel里面的Send对象要是只发送了部分,下次就不会处于ready状态了。
2.2 send()
将请求设置到KafkaChannel.send字段,同时将请求添加到InFlightRequests队列中等待响应。
先看下请求,NetworkClient使用ClientRequest类表示请求
/**
* A request being sent to the server. This holds both the network send as well as the client-level metadata.
* ClientRequest是客户端的请求,这个请求会被发送到服务器,所以封装了requestBuilder。
*/
public final class ClientRequest
// 节点地址
private final String destination;
//ClientRequest中通过requetBuilder给不同类型的请求设置不同的请求内容
private final AbstractRequest.Builder<?> requestBuilder;
// 请求头的correlation id
private final int correlationId;
// 请求头的client id
private final String clientId;
// 创建时间
private final long createdTimeMs;
//是否需要响应
private final boolean expectResponse;
//请求的超时时间
private final int requestTimeoutMs;
// 回调函数,用于处理响应
private final RequestCompletionHandler callback;
...
NetworkClient.java
/**
* Queue up the given request for sending. Requests can only be sent out to ready nodes.
* @param request The request
* @param now The current timestamp
* 发送请求,这个方法:producer跟consumer 都会调用,ClientRequest 表示客户端的请求,它是嵌在producer、consumer里面的。
*/
@Override
public void send(ClientRequest request, long now)
doSend(request, false, now);
private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now)
String nodeId = clientRequest.destination();
if (!isInternalRequest)
// If this request came from outside the NetworkClient, validate
// that we can send data. If the request is internal, we trust
// that internal code has done this validation. Validation
// will be slightly different for some internal requests (for
// example, ApiVersionsRequests can be sent prior to being in
// READY state.)
if (!canSendRequest(nodeId, now))//检测是否能够向指定Node发送请求
throw new IllegalStateException("Attempt to send a request to node " + nodeId + " which is not ready.");
AbstractRequest.Builder<?> builder = clientRequest.requestBuilder();
try
//检测版本
NodeApiVersions versionInfo = apiVersions.get(nodeId);
short version;
// Note: if versionInfo is null, we have no server version information. This would be
// the case when sending the initial ApiVersionRequest which fetches the version
// information itself. It is also the case when discoverBrokerVersions is set to false.
if (versionInfo == null)
version = builder.latestAllowedVersion();
if (discoverBrokerVersions && log.isTraceEnabled())
log.trace("No version information found when sending with correlation id to node . " +
"Assuming version .", clientRequest.apiKey(), clientRequest.correlationId(), nodeId, version);
else
version = versionInfo.latestUsableVersion(clientRequest.apiKey(), builder.oldestAllowedVersion(),
builder.latestAllowedVersion());
// The call to build may also throw UnsupportedVersionException, if there are essential
// fields that cannot be represented in the chosen version.
// builder.build()是ProduceRequest.Builder ,结果是ProduceRequest
doSend(clientRequest, isInternalRequest, now, builder.build(version));
catch (UnsupportedVersionException unsupportedVersionException)
// If the version is not supported, skip sending the request over the wire.
// Instead, simply add it to the local queue of aborted requests.
// 版本不支持,不发送,生成clientResponse,添加到abortedSends集合里
log.debug("Version mismatch when attempting to send with correlation id to ", builder,
clientRequest.correlationId(), clientRequest.destination(), unsupportedVersionException);
ClientResponse clientResponse = new ClientResponse(clientRequest.makeHeader(builder.latestAllowedVersion()),
clientRequest.callback(), clientRequest.destination(), now, now,
false, unsupportedVersionException, null, null);
abortedSends.add(clientResponse);
private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now, AbstractRequest request)
//目标节点
String destination = clientRequest.destination();
// 生成请求头
RequestHeader header = clientRequest.makeHeader(request.version());
if (log.isDebugEnabled())
int latestClientVersion = clientRequest.apiKey().latestVersion();
if (header.apiVersion() == latestClientVersion)
log.trace("Sending with correlation id to node ", clientRequest.apiKey(), request,
clientRequest.correlationId(), destination);
else
log.Kafkakafka 客户端 控制台 flink 都无法消费的情况
kafkakafka 指定分区消费 不会触发 reblance