如何使用rabbitmq同步消费者

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何使用rabbitmq同步消费者相关的知识,希望对你有一定的参考价值。

我有以下问题:我有一个RabbitMQ集群,一个消息生产者和一个消费者集群(为了高可用性)。每当消费者收到消息时,它就会根据消息内容生成另一个进程并运行它。这个过程很长,需要30分钟。

我必须确保一次处理一个消息。但是,有一个消费者更多,所以如果队列中有2个消息,一个消费者获得一个消息,另一个消费者获得另一个消息,并且它们被并行处理。

供参考:每个消费者都驻留在不同的机器上。

在RabbitMQ的级别上是否有任何机制允许我等待消耗下一条消息,直到前一条消息被确认?或者我是否必须在服务器之间开发一些锁定机制?

答案

我只是提到了我认为能够满足您要求的rabbitmq文档中的几点。

  1. 第一次参考 - https://www.rabbitmq.com/consumer-priority.html

通常,连接到队列的活动消费者以循环方式从其接收消息。当使用消费者优先级时,如果存在具有相同高优先级的多个活动消费者,则循环传递消息。

我假设您的所有消费者具有相同的优先级,因此消息将均匀地分发给所有活跃的消费者。

  1. 第二次参考 - https://www.rabbitmq.com/consumer-prefetch.html

basic.qos方法,允许您在使用时限制通道(或连接)上未确认的消息数

正如您所提到的,您将在一台机器上拥有单一消费者,它更容易。

只需为每个消费者设置消费者预取限制1。因此,在要求确认之前,服务器将仅向消费者传递一条消息。完成邮件处理后发送基本确认。

Channel channel = ...;
Consumer consumer = ...;
channel.basicQos(1); // Per consumer limit

Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                System.out.println("received message");
                // process the message .. time consuming

                // after processing send the basic ack so that next message can be received from queue
                channel.basicAck(envelope.getDeliveryTag(), false);
        };

 channel.basicConsume("my-queue", false, consumer);

我希望这有帮助。

更新 -

只是添加更多描述 - 当您使用时

channel.basicQos(x);

Rabbitmq将在通道上的每个消费者的最大x个未打包消息上推送(如果在队列中可用,当然在遵守优先级和循环等之后)。这意味着频道上的每个消费者都不会有超过x个未经消息的消息,即消费者可以在任何给定时刻同时处理max x消息。一旦消费者发回ack,就可以将下一条消息推送给它。如果消费者感觉无法处理消息,他也可以发送nack。在这种情况下,消息将被重新排队,并且重新排队的消息可以根据优先级,循环等方式发送到队列中的任何消费者。

每个渠道可以有多个消费者。所以,当你使用

channel.basicQos(x, true);

限制x适用于整个渠道,而不是渠道上的单个/每个消费者。

在您的情况下,每个频道只有一个消费者。因此,频道限制实际上对您的案例没有任何影响。

更多更新 -

机器通过连接连接到RabbitMQ。连接可以有多个通道。一个频道可以有多个消费者。因此逻辑上可以有不同的机器连接到RabbitMQ并且有多个通道和消费者在同一个队列上进行侦听。您可以同时为通道(使用channel.basicQos(x, true))和通道内的消费者(使用channel.basicQos(x, false))设置QOS限制。限制0表示无限制。显然,这些限制适用于通道实例。驻留在不同通道实例(在同一台机器或不同机器上)的所有消费者都将拥有自己的限制(默认或通过QOS方法明确设置)。

另一答案

正如@vsoni解释的那样,我们可以使用基本的QOS,它允许每个消费者一次消费X消息,并且每个频道更精确。由于频道是“共享单个TCP连接的轻量级连接”(请参阅​​here),因此,根据我的知识,在两台不同的计算机上运行的两个消费者之间无法建立QOS。我的目标是不向第二个消费者发送消息,而第一个消费者仍在使用前一个消费者。我只是设法通过外部锁定实现它(为了开发目的,我在一台机器上制作了一个简单的文件锁和3个消费者)但是为了生产,我可能会使用DynamoDB locking。其他选项是使用Zookeeper,etcd或类似软件的分布式锁定。

以上是关于如何使用rabbitmq同步消费者的主要内容,如果未能解决你的问题,请参考以下文章

如何在 .Net 中使用不同类型的消费者使用 RabbitMq 消息?

如何保证RabbitMQ消息顺序消费

使用Java模拟消费者是如何消费rabbitMQ消息队列中的消息的

SpringCloud系列十一:SpringCloudStream(SpringCloudStream 简介创建消息生产者创建消息消费者自定义消息通道分组与持久化设置 RoutingKey)(代码片段

RabbitMQ学习和使用

Django配置Celery执行异步和同步任务(tasks))