一个队列上的多个消费者 RabbitMQ - Java

Posted

技术标签:

【中文标题】一个队列上的多个消费者 RabbitMQ - Java【英文标题】:Multiple Consumer RabbitMQ on one Queue - Java 【发布时间】:2018-04-10 15:34:30 【问题描述】:

我是新手 RabbitMQ java 客户端。 我的问题:我创建了 10 个消费者并将它们添加到队列中。每个消费者使用 10 秒来处理我的流程。我检查了兔子的页面,我看到我的队列有 4000 条消息没有发送给客户端。我检查了日志客户端,结果是为一位消费者获取了一条消息,10 秒后我为一位消费者获取了一条消息,依此类推..我想同时为所有消费者获取 10 条消息(10 条消息-当时 10 条消费者进程) 请帮助我,我没有找到问题的解决方案。 非常感谢。

        while (!isRetry) 
        try 
            isRetry = true;
            connection = mConnectionFactory.newConnection(addresses.toArray(new Address[addresses.size()]));
            String queueName = "webhook_customer";
            String exchangeName = "webhook_exchange";
            String routingKey = "customer";
            System.out.println("step2");

            Channel channel = connection.createChannel();
            channel.exchangeDeclare(exchangeName, "topic", true);
            channel.queueDeclare(queueName, true, false, false, null);
            channel.queueBind(queueName, exchangeName, routingKey);
            channel.basicQos(1);
            for (int i = 0; i < numberWorker; i++) 
                Consumer consumer = new QueueingConsumer(channel) 
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope,
                                               AMQP.BasicProperties properties, byte[] body) throws IOException 
                        long startProcess = System.nanoTime();
                        JSONObject profile = null;
                        try 

                         catch (IOException ioe) 
                            handleLogError(profile, ioe.getMessage().toString());
                         catch (Exception e) 
                            handleLogError(profile, e.getMessage());
                         finally 
                            channel.basicAck(envelope.getDeliveryTag(), false);
                            long endProcess = System.nanoTime();
                            _logger.info("===========######### TIME PROCESS  + " + (endProcess - startProcess) + " Nano Seconds  ========#### " + (endProcess - startProcess) / 1000000 + " Milli Seconds");
                        
                    
                ;

                channel.basicConsume(queueName, false, consumer);
            
            System.out.printf("Start Listening message ...");
         catch (Exception e) 
            System.out.println("exception " + e.getMessage());
            isRetry = closeConnection(connection);
            e.printStackTrace();
         finally 
        
        if (!isRetry) 
            try 
                System.out.println("sleep waiting retry ...");
                Thread.sleep(30000);
             catch (InterruptedException e) 
                e.printStackTrace();
            
        
        //END
    

【问题讨论】:

您能否发布一个代码示例,说明您如何声明您的消费者。你是用channel.basicConsume还是其他方式? 我在帖子中更新了我的代码,请检查并帮助我。非常感谢 RabbitMQ by Example: Multiple Threads, Channels and Queues的可能重复 【参考方案1】:

我确实找到了解决方案。当消息进入并在其中处理时,我在消费者中使用新线程。我创建了多个频道以便同时发送多个消息。我使用线程池来控制线程

【讨论】:

【参考方案2】:

从您的代码示例看来,您可以使用QueueingConsumer 而不是DefaultConsumer。这将从 RabbitMQ 拉出更多消息给消费者并将它们排队,直到它们被处理。

然后,在您的 for (int i = 0; i &lt; 10; i++) 循环中,您使用相同的消费者实例消费了 10 次。相反,您应该创建 10 个消费者:

for (int i = 0; i < 10; i++) 
    Consumer consumer = new DefaultConsumer(channel) 
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException 
          channel.basicAck(envelope.getDeliveryTag(),false);
        
    ;

    channel.basicConsume(queueName, false, consumer);

理想情况下,创建另一个类并在循环中正确创建新实例而不是匿名实例。

注意:您的消费者应该在后台(单独的线程)执行他们的进程,否则他们会互相阻塞。虽然,您提供的信息并没有真正表明您将如何实际处理这些消息。

【讨论】:

我使用了 QueueingConsumer(它已弃用)并修改了我的代码但无法正常工作 RabbitMQ "默认情况下,RabbitMQ 会按顺序将每条消息发送给下一个不忙的消费者。平均每个消费者会收到相同数量的消息。这种分发消息的方式称为轮询. 与三个或更多工人一起尝试。但不工作,我不干净.. 这是因为你的消费者没有线程化。查看Consumer thread pool 部分rabbitmq.com/api-guide.html

以上是关于一个队列上的多个消费者 RabbitMQ - Java的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMQ 跨多个队列的多个消费者 - 消息延迟处理

RabbitMQ / AMQP:单个队列,同一消息的多个消费者?

rabbitMQ 点对点 一个队列可以多个消费者吗?

rabbitmq 生产者 消费者(多个线程消费同一个队列里面的任务。)

RabbitMQ发布订阅模式

RabbitMQ中有大批量的消息,此时多个消费者同时访问消息队列是怎样取里面的消息的?