我可以使用自定义算法而不是循环使用 RabbitMQ 发送消息吗?
Posted
技术标签:
【中文标题】我可以使用自定义算法而不是循环使用 RabbitMQ 发送消息吗?【英文标题】:Can I dispatch messages with a custom algorithm instead of round robin using RabbitMQ? 【发布时间】:2017-08-17 12:30:22 【问题描述】:我正在使用 RabbitMQ 的循环功能在多个消费者之间发送消息,但一次只有一个消费者接收实际消息。
我的问题是我的消息代表任务,我希望在我的消费者上拥有本地会话(状态)。我事先知道哪些消息属于哪个会话,但我不知道使用我指定的算法将 RabbitMQ 分派给消费者的最佳方法是什么(或者有没有办法?)。
我不想编写自己的编排服务,因为它会成为瓶颈,我不希望我的生产者知道哪个消费者会接收他们的消息,因为我会失去使用 Rabbit 获得的解耦。
有没有办法让 RabbitMQ 根据预定义的算法/规则而不是循环法将我的消息发送给消费者?
澄清:我使用了几个用不同语言编写的微服务,每个服务都有自己的工作。我使用 protobuf 消息在它们之间进行通信。我给每条新消息一个UUID
。如果消费者收到一条消息,它可以从中创建一条响应消息(这可能不是正确的术语,因为生产者和消费者是分离的并且他们彼此不了解) 和 @987654322 @ 被复制到新消息中。这形成了一个数据转换管道,这个“进程”由UUID
(processId)标识。我的问题是,我可能有多个工人消费者,如果以前见过UUID
,我需要一名工人坚持。我有这个需求是因为
-
每个进程可能都有本地状态
进程完成后我要清理本地状态
一个微服务可能会收到同一个进程的多条消息,我需要区分哪个消息属于哪个进程
由于 RabbitMQ 使用循环法在工作人员之间分配任务,因此我不能强制我的进程坚持一个工作人员。我有几个警告:
生产者与消费者分离,因此不能选择直接消息传递 工作人员的数量不是恒定的(有一个负载平衡器可能会启动工作人员的新实例)如果有一个不涉及更改循环算法并且不违反我的约束的解决方法也可以!
【问题讨论】:
您能否详细说明是什么让消息属于特定消费者? 查看我的编辑以澄清这一点。 【参考方案1】:如果您不想使用编排服务,您可以尝试类似的拓扑:
为简单起见,我假设您的 processId
用作路由键(在现实世界中,您可能希望将其存储在标头中并改用 header
交换)。
传入的消息将被 Incoming Exchange(类型:direct)接受,它的alternative-exchange
属性设置为指向No Session Exchange
(扇出)。
这是 RabbitMQ 文档在“替代交换”中所说的:
有时需要让客户端处理交换无法路由的消息(即,因为没有绑定队列我们没有匹配的绑定)。
典型的例子是
检测客户端何时意外或恶意发布无法路由的消息 “or else”路由语义,其中一些消息被特殊处理,其余消息由通用处理程序处理RabbitMQ 的备用交换(“AE”)功能解决了这些用例。
(我们对这里的or else
用例特别感兴趣)
每个消费者都将创建自己的队列并将其绑定到Incoming Exchange,使用processId(s)
作为它目前知道的会话,作为绑定的路由键。 p>
这样它只会收到它感兴趣的会话的消息。
另外,所有的消费者都会绑定到共享的No Session Queue。
如果带有以前未知的processId
的消息传入,将不会在 Incoming Exchange 注册它的特定绑定,因此它将被重新路由到 No Session交换 => 无会话队列,并以通常(循环)方式分派给其中一个消费者。
然后,消费者将向 Incoming Exchange 注册一个新的绑定(即开始一个新的“会话”),这样它就可以使用这个processId
获取所有后续消息.
一旦“会话”结束,它必须删除相应的绑定(即关闭“会话”)。
【讨论】:
简直是天才!我知道备用交换,但我不知道它可以用于这个巧妙的技巧!以上是关于我可以使用自定义算法而不是循环使用 RabbitMQ 发送消息吗?的主要内容,如果未能解决你的问题,请参考以下文章
我可以使用 IP 地址作为名称服务器而不是创建自定义名称服务器 [关闭]
安装节点包时可以使用自定义目录名称而不是“node_modules”吗?