在 BasicAcks 中关闭 RabbitMQ 通道

Posted

技术标签:

【中文标题】在 BasicAcks 中关闭 RabbitMQ 通道【英文标题】:Close RabbitMQ channel while in BasicAcks 【发布时间】:2020-01-20 13:55:07 【问题描述】:

我在一个连接下有一个 RabbitMQ 通道池。我正在尝试实现Publisher Confirms 功能。我以一种随着对频道的需求增加而创建新频道的方式构建了池。但现在,我也必须管理这些渠道的关闭。

我计划使用outstandingConfirms 列表实施类似RabbitMQ tutorials 的解决方案。

我遇到的问题是,当通道中的最后一条消息是ack-ednack-ed 时,我必须以某种方式关闭通道(当我高于池中对象的“软”阈值时)。

正如您在下面的代码中看到的,sender 参数实际上是通道本身,我想如果没有待处理的消息,我可以直接使用它来关闭通道。但是我面临这样一个事实,即通道不应该被多个线程同时使用。这个相同的频道将在池中可用,并且可以被应用程序拾取以供使用。

这些是事件订阅:

protected void OnBasicAcks(object sender, BasicAckEventArgs e)

    //sender = channel object
    //todo close channel after all pending messages (n)acked


protected void OnBasicNacks(object sender, BasicNackEventArgs e)

    //sender = channel object
    //todo close channel after all pending messages (n)acked

另外here 我可以读到回调处理程序中允许阻塞操作:

从 3.5.0 版开始,应用程序回调处理程序可以调用阻塞 操作(例如 IModel.QueueDeclare 或 IModel.BasicCancel)

但这是否也适用于通道本身的关闭?

简而言之,这是我的问题。我的问题是:

    是否可以直接使用通过sender参数传递的通道进行IModel.Close()之类的操作? 我只是想防止在频道或其他奇怪的东西上出现死锁。针对我的问题有什么建议或最佳实践吗? 还有一些完全不同的东西:我可以在OnBasicAcks 方法中创建一个新频道吗?

【问题讨论】:

【参考方案1】:

如果您使用的是最新的 RabbitMQ.Client(我认为是 v5.1),则所有回调都是使用 async 在 ThreadPool 上调用的,因此您通常可以在回调中调用这些方法。但是,我会避免劫持回调线程来执行任何长时间运行的任务,因为它们是由 RabbitMQ 客户端按通道管理的。

所以,具体回答一下。

    应该没问题,查看源代码,它不会锁定任何会阻止调用 close 的东西。请注意您的代码中会死锁的任何内容。 最好在您自己的代码中控制对会话的访问,这样您就不会从多个线程调用单个通道,期望任何事情以特定的顺序发生。 应该可以正常工作,因为您在 TPL 任务线程上,所以没有理由不应该工作。

如果您想了解有关客户端工作原理的更多详细信息,可以查看source,尤其是ModelBase.cs 文件,以及它为回调创建的异步工作器服务。

【讨论】:

以上是关于在 BasicAcks 中关闭 RabbitMQ 通道的主要内容,如果未能解决你的问题,请参考以下文章

在 windows 中关闭显示器

如何在线程中关闭 QWebSocket?

检查在java中关闭的游标

如何在导航控件片段中关闭导航 DrawerLayout onBackPressed

在 python 的运行脚本中关闭 GPU

在 WPF Frame 控件中关闭导航页面声音