RabbitMQ - 消息传递顺序
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ - 消息传递顺序相关的知识,希望对你有一定的参考价值。
我需要为我的新项目选择一个新的队列代理。
这次我需要一个支持pub / sub的可伸缩队列,并且必须保持消息排序。
我读过亚历克西斯的评论:他写道:
“事实上,我们认为RabbitMQ比Kafka提供更强的订购”
我在rabbitmq docs中阅读了消息订购部分:
“消息可以使用AMQP方法返回队列,这些方法具有重新排队参数(basic.recover,basic.reject和basic.nack),或者由于在保留未确认的消息时关闭通道...使用2.7.0及更高版本如果队列有多个订阅者,个别消费者仍然可以无序地观察消息。这是由于其他订阅者可能重新排队消息的行为。从队列的角度来看,消息总是保存在发布顺序中。 “
如果我需要按订单处理消息,我只能使用带有独占队列的rabbitMQ给每个消费者吗?
RabbitMQ仍然被认为是有序消息排队的一个很好的解决方案吗?
那么,让我们仔细看看你上面描述的场景。我认为在问题的片段之前立即粘贴the documentation以提供上下文非常重要:
AMQP 0-9-1核心规范的第4.7节解释了保证订购的条件:在一个信道中发布的消息,通过一个交换机,一个队列和一个输出信道将按照它们发送的相同顺序接收。自2.7.0发布以来,RabbitMQ提供更强大的保障。
可以使用具有重新排队参数(basic.recover,basic.reject和basic.nack)的AMQP方法将消息返回到队列,或者由于在保留未确认的消息时关闭通道。任何这些情况都会导致消息在早于2.7.0的RabbitMQ版本的队列后面重新排队。从RabbitMQ版本2.7.0开始,消息始终以发布顺序保存在队列中,即使存在重新排队或通道关闭。 (重点补充)
因此,很明显,从2.7.0开始,RabbitMQ在消息排序方面对原始AMQP规范进行了相当大的改进。
对于多个(并行)消费者,无法保证处理顺序。 第三段(粘贴在问题中)继续给出免责声明,我将解释:“如果队列中有多个处理器,则不再保证消息将按顺序处理。”他们在这里所说的只是RabbitMQ无法违反数学定律。
考虑银行的一系列客户。这家银行以帮助客户进入银行的顺序而自豪。客户排队等候,并由3个可用柜员中的下一个提供服务。
今天早上,所有三名柜员同时出现,接下来的三位顾客接近了。突然之间,三名计票员中的第一名患者病情严重,无法完成服务第一位客户。到发生这种情况时,柜员2已完成客户2,柜员3已经开始为客户3服务。
现在,有两件事情可能发生。 (1)第一个在线的客户可以回到生产线的主管或(2)第一个客户可以抢先第三个客户,导致该柜员停止在第三个客户上工作并开始第一个工作。 RabbitMQ不支持这种类型的抢占逻辑,也不支持我所知道的任何其他消息代理。在任何一种情况下,第一个客户实际上并没有最终获得帮助 - 第二个客户确实如此,幸运地获得了一个好的,快速的出纳员。保证客户得到帮助的唯一方法是帮助一个柜员一次一个地帮助客户,这将导致银行的主要客户服务问题。
我希望这有助于说明您所询问的问题。鉴于您有多个消费者,不可能确保在每种可能的情况下按顺序处理消息。如果你有多个队列,多个独家消费者,不同的经纪人等等都没关系 - 没有办法保证先前消息是按照多个消费者的顺序回答的。但RabbitMQ将尽最大努力。
消息排序在Kafka中保留,但仅在分区内而不是全局内。如果您的数据需要全局排序和分区,这确实会让事情变得困难。但是,如果您只需要确保同一个用户的所有相同事件......最终都在同一个分区中,以便正确排序,您可以这样做。生产者负责他们写入的分区,因此如果您能够对数据进行逻辑分区,则这可能更为可取。
我认为这个问题有两个不相似的,消费订单和处理订单。
消息队列可以 - 在某种程度上 - 保证消息将按顺序消耗,但是,它们不能为您的处理顺序提供任何保证。
这里的主要区别在于消息处理时无法确定消息处理的某些方面,例如:
- 如上所述,消费者在处理时可能会失败,这里消息的消费顺序是正确的,但是,消费者未能正确处理它,这将使其返回队列,直到现在消费订单仍然完好但我们不知道我知道现在的处理顺序
- 如果通过“处理”我们意味着消息现在被丢弃并完成处理,那么考虑处理时间不是线性的情况,换句话说处理一个消息比另一个消息花费更长时间,所以如果消息3在处理中花费的时间比预期,然后消息4和5可能被消耗并在消息3之前完成处理
因此,即使您设法将消息返回到队列的前面(这违反了消费顺序),您仍然无法保证在下一条消息之前的所有消息都已完成处理。
如果您想确保处理订单,那么:
- 始终只有1个消费者实例
- 或者不要使用消息传递队列并使用同步阻塞方法进行处理,这可能听起来很糟糕,但在许多情况下,业务要求是完全有效的,有时甚至是关键的
有适当的方法可以保证RabbitMQ订阅中的消息顺序。
如果您使用多个消费者,他们将使用共享的ExecutorService
处理消息。另见ConnectionFactory.setSharedExecutor(...)
。你可以设置一个Executors.newSingleThreadExecutor()
。
如果将一个Consumer
与一个队列一起使用,则可以使用多个bindingKeys绑定此队列(它们可能具有通配符)。消息将按消息代理接收的顺序放入队列。
例如,您有一个发布者,它发布订单很重要的消息:
try (Connection connection2 = factory.newConnection();
Channel channel2 = connection.createChannel()) {
// publish messages alternating to two different topics
for (int i = 0; i < messageCount; i++) {
final String routingKey = i % 2 == 0 ? routingEven : routingOdd;
channel2.basicPublish(exchange, routingKey, null, ("Hello" + i).getBytes(UTF_8));
}
}
您现在可能希望以与发布它们相同的顺序从队列中的两个主题接收消息:
// declare a queue for the consumer
final String queueName = channel.queueDeclare().getQueue();
// we bind to queue with the two different routingKeys
final String routingEven = "even";
final String routingOdd = "odd";
channel.queueBind(queueName, exchange, routingEven);
channel.queueBind(queueName, exchange, routingOdd);
channel.basicConsume(queueName, true, new DefaultConsumer(channel) { ... });
Consumer
现在将按照发布的顺序接收消息,无论您使用不同的主题。
RabbitMQ文档中有一些很好的5分钟教程可能会有所帮助:https://www.rabbitmq.com/tutorials/tutorial-five-java.html
以上是关于RabbitMQ - 消息传递顺序的主要内容,如果未能解决你的问题,请参考以下文章
当 RabbitMQ 交换不存在时如何处理错误(并且消息通过消息传递网关接口发送)