Pulsar 客户端线程平衡
Posted
技术标签:
【中文标题】Pulsar 客户端线程平衡【英文标题】:Pulsar client thread balance 【发布时间】:2021-12-21 15:54:37 【问题描述】:我正在尝试实现一个具有多个生产者的 Pulsar 客户端,它在线程之间分配负载,但无论在 ioThreads() 和 listenerThreads() 上传递的值如何,它总是使第一个线程超载(> 65% cpu而其他线程完全空闲)
我尝试了一些方法,包括每小时进行一次“动态重新平衡”(最后一种方法),但在过程中间关闭它肯定不是最好的方法
这是相关代码
...
// pulsar client
pulsarClient = PulsarClient.builder() //
.operationTimeout(config.getAppPulsarTimeout(), TimeUnit.SECONDS) //
.ioThreads(config.getAppPulsarClientThreads()) //
.listenerThreads(config.getAppPulsarClientThreads()) //
.serviceUrl(config.getPulsarServiceUrl()).build();
...
private createProducers()
String strConsumerTopic = this.config.getPulsarTopicInput();
List<Integer> protCasesList = this.config.getEventProtoCaseList();
for (Integer e : protCasesList)
String topicName = config.getPulsarTopicOutput().concat(String.valueOf(e));
LOG.info("Creating producer for topic: ", topicName);
Producer<byte[]> protobufProducer = pulsarClient.newProducer().topic(topicName).enableBatching(false)
.blockIfQueueFull(true).compressionType(CompressionType.NONE)
.sendTimeout(config.getPulsarSendTimeout(), TimeUnit.SECONDS)
.maxPendingMessages(config.getPulsarMaxPendingMessages()).create();
this.mapLink.put(strConsumerTopic.concat(String.valueOf(e)), protobufProducer);
public void closeProducers()
String strConsumerTopic = this.config.getPulsarTopicInput();
List<Integer> protCasesList = this.config.getEventProtoCaseList();
for (Integer e : protCasesList)
try
this.mapLink.get(strConsumerTopic.concat(String.valueOf(e))).close();
LOG.info(" producer correctly closed...",
this.mapLink.get(strConsumerTopic.concat(String.valueOf(e))).getProducerName());
catch (PulsarClientException e1)
LOG.error("Producer: not closed cause: ",
this.mapLink.get(strConsumerTopic.concat(String.valueOf(e))).getProducerName(),
e1.getMessage());
public void rebalancePulsarThreads(boolean firstRun)
ThreadMXBean threadHandler = ManagementFactory.getThreadMXBean();
ThreadInfo[] threadsInfo = threadHandler.getThreadInfo(threadHandler.getAllThreadIds());
for (ThreadInfo threadInfo : threadsInfo)
if (threadInfo.getThreadName().contains("pulsar-client-io"))
// enable cpu time for all threads
threadHandler.setThreadCpuTimeEnabled(true);
// get cpu time for this specific thread
long threadCPUTime = threadHandler.getThreadCpuTime(threadInfo.getThreadId());
int thresholdCPUTime = 65;
if (threadCPUTime > thresholdCPUTime)
LOG.warn("Pulsar client thread with CPU time greater than % - REBALANCING now", thresholdCPUTime);
try
closeProducers();
catch (Exception e)
if (!firstRun)
// producers will not be available in the first run
// therefore, the logging only happens when it is not the first run
LOG.warn("Unable to close Pulsar client threads on rebalancing: ", e.getMessage());
try
createPulsarProducers();
catch (Exception e)
LOG.warn("Unable to create Pulsar client threads on rebalancing: ", e.getMessage());
【问题讨论】:
【参考方案1】:根据您的描述,最可能的情况是您使用的所有主题都由一个代理提供服务。
如果确实如此,并且避免跨代理的主题负载平衡,它使用单个线程是正常的,因为所有这些生产者将共享一个池化 TCP 连接,并且每个连接都分配给 1 个 IO 线程(侦听器线程用于消费者监听器)。
如果你想强制更多线程,你可以增加“每个代理的最大 TCP 连接数”设置,以使用所有配置的 IO 线程。
例如:
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.ioThreads(16)
.connectionsPerBroker(16)
.create();
【讨论】:
非常感谢@matteo!以上是关于Pulsar 客户端线程平衡的主要内容,如果未能解决你的问题,请参考以下文章
Pulsar 的消息存储机制和 Bookie 的 GC 机制原理