Java 客户端中的 RabbitMQ 通道和线程

Posted

技术标签:

【中文标题】Java 客户端中的 RabbitMQ 通道和线程【英文标题】:RabbitMQ channels and threads in Java client 【发布时间】:2014-12-08 08:48:33 【问题描述】:

我想快速确认一下我怀疑 RabbitMQ 文档的这一部分所说的内容:

对消费者的回调在与连接管理的线程不同的线程上分派。这意味着消费者可以安全地调用 Connection 或 Channel 上的阻塞方法,例如 queueDeclare、txCommit、basicCancel 或 basicPublish。

每个 Channel 都有自己的调度线程。对于每个通道一个消费者的最常见用例,这意味着消费者不会阻碍其他消费者。如果每个 Channel 有多个 Consumer,请注意长时间运行的 Consumer 可能会阻止向该 Channel 上的其他 Consumer 发送回调。

我有各种命令(消息)通过一个附加了 DefaultConsumer 的单个入站队列和通道进入。假设 DefaultConsumer 中有一个线程池可以让我直接从消费者回调方法运行应用程序逻辑,并且我不会阻止后续命令的处理是否正确?如果看起来有瓶颈,我可以给 RMQ 一个更大的线程池吗?

此外,偶尔会有一个basicPublish从其他线程到同一频道。我认为这确实阻碍了消费者?我想我应该在这样做时使用一个新频道?

【问题讨论】:

【参考方案1】:

您提到的线程池不是DefaultConsumer 的一部分,而是Connection 的一部分,在Channels 和DefaultConsumers 之间共享。它允许并行调用不同的消费者。见this part of the guide。

所以您会期望通过增加线程池的大小可以达到更高的并行度。然而,这并不是影响它的唯一因素。

有一个很大的警告:无论线程池中有多少线程,流经单个通道的传入消息都是串行处理的。这就是ConsumerWorkService 的实现方式。

因此,为了能够同时使用传入消息,您必须管理多个通道或将这些消息放入单独的线程池中。

发布不使用来自Connections 的线程池的线程,因此它们不会占用消费者。

更多详情您可以查看this post。

【讨论】:

@wheleph 上面帖子的链接已过时,新链接为wheleph.gitlab.io/2015/09/06/rabbitmq-async-consumption 我想知道如果我不需要自动确认但在单独的线程池中消费怎么办? ***.com/questions/45428614/…

以上是关于Java 客户端中的 RabbitMQ 通道和线程的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMQ - 关闭空闲/悬空通道

如何在 rabbitmq 中合并频道?

使用多线程和通道发布时,线程都被阻塞,rabbitmq

gRPC 中的通道/存根是线程安全的吗

RabbitMQ/JAVA 客户端测试(补:利用线程)

《RabbitMQ开发库的完整API文档》翻译