kafka AdminClient 闲时关闭连接
Posted allenwas3
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka AdminClient 闲时关闭连接相关的知识,希望对你有一定的参考价值。
AdminClient 类提供了创建 topic,删除 topic 的 api。
在项目中创建了一个 AdminClient 对象,每次创建 topic 时,调用
org.apache.kafka.clients.admin.AdminClient#createTopics
如果长时间不使用这个对象,客户端与 broker 之间的连接会被关掉,相关的参数:
connections.max.idle.ms
这个最大空闲参数在 broker 和 客户端都可以配置,即 broker 和客户端都会关闭空闲太久的连接。
org.apache.kafka.common.network.Selector#maybeCloseOldestConnection
private void maybeCloseOldestConnection(long currentTimeNanos) { if (idleExpiryManager == null) return; Map.Entry<String, Long> expiredConnection = idleExpiryManager.pollExpiredConnection(currentTimeNanos); if (expiredConnection != null) { String connectionId = expiredConnection.getKey(); KafkaChannel channel = this.channels.get(connectionId); if (channel != null) { if (log.isTraceEnabled()) log.trace("About to close the idle connection from {} due to being idle for {} millis", connectionId, (currentTimeNanos - expiredConnection.getValue()) / 1000 / 1000); channel.state(ChannelState.EXPIRED); close(channel, CloseMode.GRACEFUL); } } }
org.apache.kafka.common.network.Selector.IdleExpiryManager#pollExpiredConnection
public Map.Entry<String, Long> pollExpiredConnection(long currentTimeNanos) { if (currentTimeNanos <= nextIdleCloseCheckTime) return null; if (lruConnections.isEmpty()) { nextIdleCloseCheckTime = currentTimeNanos + connectionsMaxIdleNanos; return null; } Map.Entry<String, Long> oldestConnectionEntry = lruConnections.entrySet().iterator().next(); Long connectionLastActiveTime = oldestConnectionEntry.getValue(); nextIdleCloseCheckTime = connectionLastActiveTime + connectionsMaxIdleNanos; if (currentTimeNanos > nextIdleCloseCheckTime) return oldestConnectionEntry; else return null; }
以上是关于kafka AdminClient 闲时关闭连接的主要内容,如果未能解决你的问题,请参考以下文章
Kafka 0.11客户端集群管理工具AdminClient