在 BasicAcks 中关闭 RabbitMQ 通道
Posted
技术标签:
【中文标题】在 BasicAcks 中关闭 RabbitMQ 通道【英文标题】:Close RabbitMQ channel while in BasicAcks 【发布时间】:2020-01-20 13:55:07 【问题描述】:我在一个连接下有一个 RabbitMQ 通道池。我正在尝试实现Publisher Confirms
功能。我以一种随着对频道的需求增加而创建新频道的方式构建了池。但现在,我也必须管理这些渠道的关闭。
我计划使用outstandingConfirms
列表实施类似RabbitMQ tutorials 的解决方案。
我遇到的问题是,当通道中的最后一条消息是ack-ed
或nack-ed
时,我必须以某种方式关闭通道(当我高于池中对象的“软”阈值时)。
正如您在下面的代码中看到的,sender
参数实际上是通道本身,我想如果没有待处理的消息,我可以直接使用它来关闭通道。但是我面临这样一个事实,即通道不应该被多个线程同时使用。这个相同的频道将在池中可用,并且可以被应用程序拾取以供使用。
这些是事件订阅:
protected void OnBasicAcks(object sender, BasicAckEventArgs e)
//sender = channel object
//todo close channel after all pending messages (n)acked
protected void OnBasicNacks(object sender, BasicNackEventArgs e)
//sender = channel object
//todo close channel after all pending messages (n)acked
另外here 我可以读到回调处理程序中允许阻塞操作:
从 3.5.0 版开始,应用程序回调处理程序可以调用阻塞 操作(例如 IModel.QueueDeclare 或 IModel.BasicCancel)
但这是否也适用于通道本身的关闭?
简而言之,这是我的问题。我的问题是:
-
是否可以直接使用通过
sender
参数传递的通道进行IModel.Close()
之类的操作?
我只是想防止在频道或其他奇怪的东西上出现死锁。针对我的问题有什么建议或最佳实践吗?
还有一些完全不同的东西:我可以在OnBasicAcks
方法中创建一个新频道吗?
【问题讨论】:
【参考方案1】:如果您使用的是最新的 RabbitMQ.Client(我认为是 v5.1),则所有回调都是使用 async 在 ThreadPool 上调用的,因此您通常可以在回调中调用这些方法。但是,我会避免劫持回调线程来执行任何长时间运行的任务,因为它们是由 RabbitMQ 客户端按通道管理的。
所以,具体回答一下。
-
应该没问题,查看源代码,它不会锁定任何会阻止调用 close 的东西。请注意您的代码中会死锁的任何内容。
最好在您自己的代码中控制对会话的访问,这样您就不会从多个线程调用单个通道,期望任何事情以特定的顺序发生。
应该可以正常工作,因为您在 TPL 任务线程上,所以没有理由不应该工作。
如果您想了解有关客户端工作原理的更多详细信息,可以查看source,尤其是ModelBase.cs 文件,以及它为回调创建的异步工作器服务。
【讨论】:
以上是关于在 BasicAcks 中关闭 RabbitMQ 通道的主要内容,如果未能解决你的问题,请参考以下文章