kafka 消费者拉取消息
Posted allenwas3
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka 消费者拉取消息相关的知识,希望对你有一定的参考价值。
本文只跟踪消费者拉取消息的流程。对于 java 客户端, kafka 的生产者和消费者复用同一个网络 io 类 NetworkClient。
入口在 KafkaConsumer#pollOnce 中,抽出主要步骤:
// 构造 FetchRequest 请求,将请求对象放入 unsent 集合,等待发送 fetcher.sendFetches(); // 取出 unsent 中的请求,调用 NetworkClient#send,NetworkClinet#poll client.poll(pollTimeout, nowMs, new PollCondition() @Override public boolean shouldBlock() // since a fetch might be completed by the background thread, we need this poll condition // to ensure that we do not block unnecessarily in poll() return !fetcher.hasCompletedFetches(); ); // 返回数据给用户 return fetcher.fetchedRecords();
Fetcher#sendFetches
public synchronized int sendFetches() // 构造拉取消息请求。从哪个节点,哪个分区,什么位置拉取消息 Map<Node, FetchSessionHandler.FetchRequestData> fetchRequestMap = prepareFetchRequests(); for (Map.Entry<Node, FetchSessionHandler.FetchRequestData> entry : fetchRequestMap.entrySet()) final Node fetchTarget = entry.getKey(); final FetchSessionHandler.FetchRequestData data = entry.getValue(); //1. 借助 Builder 构造 FetchRequest 对象 final FetchRequest.Builder request = FetchRequest.Builder .forConsumer(this.maxWaitMs, this.minBytes, data.toSend()) .isolationLevel(isolationLevel) .setMaxBytes(this.maxBytes) .metadata(data.metadata()) .toForget(data.toForget()); if (log.isDebugEnabled()) log.debug("Sending to broker ", isolationLevel, data.toString(), fetchTarget); client.send(fetchTarget, request) //4. 给 RequestFutureCompletionHandler.future 添加 RequestFutureListener .addListener(new RequestFutureListener<ClientResponse>() @Override public void onSuccess(ClientResponse resp) synchronized (Fetcher.this) FetchResponse response = (FetchResponse) resp.responseBody(); FetchSessionHandler handler = sessionHandler(fetchTarget.id()); if (handler == null) log.error("Unable to find FetchSessionHandler for node . Ignoring fetch response.", fetchTarget.id()); return; if (!handler.handleResponse(response)) return; Set<TopicPartition> partitions = new HashSet<>(response.responseData().keySet()); FetchResponseMetricAggregator metricAggregator = new FetchResponseMetricAggregator(sensors, partitions); for (Map.Entry<TopicPartition, FetchResponse.PartitionData> entry : response.responseData().entrySet()) TopicPartition partition = entry.getKey(); long fetchOffset = data.sessionPartitions().get(partition).fetchOffset; FetchResponse.PartitionData fetchData = entry.getValue(); log.debug("Fetch at offset for partition returned fetch data ", isolationLevel, fetchOffset, partition, fetchData); // 10. 把数据放入 completedFetches,最终返回给用户 completedFetches.add(new CompletedFetch(partition, fetchOffset, fetchData, metricAggregator, resp.requestHeader().apiVersion())); sensors.fetchLatency.record(resp.requestLatencyMs()); @Override public void onFailure(RuntimeException e) synchronized (Fetcher.this) FetchSessionHandler handler = sessionHandler(fetchTarget.id()); if (handler != null) handler.handleError(e); ); return fetchRequestMap.size();
ConsumerNetworkClient#send
public RequestFuture<ClientResponse> send(Node node, AbstractRequest.Builder<?> requestBuilder) long now = time.milliseconds(); //2. 使用 RequestFutureCompletionHandler 作为回调函数 RequestFutureCompletionHandler completionHandler = new RequestFutureCompletionHandler(); ClientRequest clientRequest = client.newClientRequest(node.idString(), requestBuilder, now, true, completionHandler); //3. 请求放入 unsent 集合 unsent.put(node, clientRequest); // wakeup the client in case it is blocking in poll so that we can send the queued request client.wakeup(); return completionHandler.future;
ConsumerNetworkClient#poll
// 5. 发送 unsent 中的请求,并没有产生网络 io trySend(now); // 真实的网络数据写和读 // 6. 发送请求 // 7. 接收响应 // 8. 触发 RequestFutureCompletionHandler 回调 client.poll(0, now); // 9. 触发 RequestFutureListener 中的回调 firePendingCompletedRequests();
NetworkClient#handleCompletedReceives
private void handleCompletedReceives(List<ClientResponse> responses, long now) for (NetworkReceive receive : this.selector.completedReceives()) String source = receive.source(); InFlightRequest req = inFlightRequests.completeNext(source); Struct responseStruct = parseStructMaybeUpdateThrottleTimeMetrics(receive.payload(), req.header, throttleTimeSensor, now); if (log.isTraceEnabled()) log.trace("Completed receive from node for with correlation id , received ", req.destination, req.header.apiKey(), req.header.correlationId(), responseStruct); AbstractResponse body = AbstractResponse.parseResponse(req.header.apiKey(), responseStruct); if (req.isInternalRequest && body instanceof MetadataResponse) metadataUpdater.handleCompletedMetadataResponse(req.header, now, (MetadataResponse) body); else if (req.isInternalRequest && body instanceof ApiVersionsResponse) handleApiVersionsResponse(responses, req, now, (ApiVersionsResponse) body); else // 此处给 responses 添加元素 // return new ClientResponse(header, callback, destination, createdTimeMs, timeMs, false, null, response); // 直接把请求的 callback 赋值给响应 // 生产者发送消息的 callback,是用户通过参数传入的 // 消费者拉取消息的 callback,是在 ConsumerNetworkClient#send 指定的,是 RequestFutureCompletionHandler responses.add(req.completed(body, now));
NetworkClient#completeResponses
private void completeResponses(List<ClientResponse> responses) for (ClientResponse response : responses) try // callback.onComplete(this); response.onComplete(); catch (Exception e) log.error("Uncaught error in request completion:", e);
RequestFutureCompletionHandler#onComplete
public void onComplete(ClientResponse response) this.response = response; pendingCompletion.add(this);
ConsumerNetworkClient#firePendingCompletedRequests
private void firePendingCompletedRequests() boolean completedRequestsFired = false; for (;;) RequestFutureCompletionHandler completionHandler = pendingCompletion.poll(); if (completionHandler == null) break; completionHandler.fireCompletion(); completedRequestsFired = true; // wakeup the client in case it is blocking in poll for this future‘s completion if (completedRequestsFired) client.wakeup();
ConsumerNetworkClient.RequestFutureCompletionHandler#fireCompletion
public void fireCompletion() if (e != null) future.raise(e); else if (response.wasDisconnected()) RequestHeader requestHeader = response.requestHeader(); int correlation = requestHeader.correlationId(); log.debug("Cancelled request with correlation id due to node being disconnected", requestHeader.apiKey(), requestHeader, correlation, response.destination()); future.raise(DisconnectException.INSTANCE); else if (response.versionMismatch() != null) future.raise(response.versionMismatch()); else future.complete(response);
RequestFuture#complete
public void complete(T value) try if (value instanceof RuntimeException) throw new IllegalArgumentException("The argument to complete can not be an instance of RuntimeException"); if (!result.compareAndSet(INCOMPLETE_SENTINEL, value)) throw new IllegalStateException("Invalid attempt to complete a request future which is already complete"); fireSuccess(); finally completedLatch.countDown(); private void fireSuccess() T value = value(); while (true) RequestFutureListener<T> listener = listeners.poll(); if (listener == null) break; // 终于调到 RequestFutureListener listener.onSuccess(value);
如果不考虑心跳线程,consumer 第一次 poll 是不会有数据的,此时请求才发出去,响应还没回来,必须在第二次 poll 时,才能同时处理网络读写事件。
跟完之后,个人觉得调用链还是挺长的。一点感觉,全程只有一个线程,但是每次走的分支都不一样,给人的启发就是,单线程只要不等待,速度也很快。
以上是关于kafka 消费者拉取消息的主要内容,如果未能解决你的问题,请参考以下文章