从 Java 客户端中的多个 RabbitMQ 交换中读取,没有轮询

Posted

技术标签:

【中文标题】从 Java 客户端中的多个 RabbitMQ 交换中读取,没有轮询【英文标题】:Read from multiple RabbitMQ exchanges in java client no polling 【发布时间】:2014-04-17 05:10:44 【问题描述】:

请解释如何配置 Java 客户端以从两个不同的 RabbitMQ 交换中读取而不进行轮询。我希望客户端在消息到达时唤醒,然后再次阻塞。

在我的小系统集成问题中,一个 RabbitMQ 交换使用各种路由键(我知道如何使用通配符来捕获它们)携带工作消息,而另一个交换携带控制消息(例如,“停止”)。所以我的客户必须听取来自两个地方的消息。这是一个相对低容量的系统问题,我不是在问负载分担或公平等问题。

当然,我可以运行一个线程来轮询每个交换、休眠、分派,直到永远。但我想避免投票。

不知何故,我想起了 Unix 的 select() 系统调用,当数据准备就绪时,它会在交给它的任何文件描述符上唤醒。 RabbitMQ 有类似的吗?

我目前的解决方案是一个适配器,它启动一个线程来阻塞每个输入交换;收到后,每个线程都会写入 java.util.concurrent 集合;我使用另一个线程来阻止该集合并在消息到达最终消费者时传递消息。它工作正常,但如果我能消除这种复杂性,那就太好了。

这些 SO 帖子围绕着这个问题展开,如果我在这些帖子中忽略了它,请随时在解决方案中摸摸我的鼻子:

对于java: RabbitMQ by Example: Multiple Threads, Channels and Queues

对于 C#: Reading from multiple queues, RabbitMQ

提前致谢。

【问题讨论】:

你看过教程吗? rabbitmq.com/tutorials/tutorial-three-java.html 您需要每个消费者一个线程,并在等待每条消息时使用 consumer.nextDelivery() 方法进行阻塞。 【参考方案1】:

感谢 robthewolf 的评论。是的,我已经阅读了教程,我知道我需要每个消费者一个线程。

事实证明,使用单个线程从多个交换中读取数据很简单,根本不需要轮询:获取一个新队列,并将其绑定到所有相关的交换。适用于主题和扇出。 用 SSCE 对此进行了测试,见下文。

我很遗憾 RabbitMQ javadoc 中缺乏细节,Channel#queueBind(String, String, String) 方法中的几个选择词会有很大帮助。

HTH

package rabbitExample;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

/**
 * Demonstrates reading messages from two exchanges via a single queue monitored
 * by a single thread.
 * 
 */
public class MultiExchangeReadTest implements Runnable 

private final String exch1 = "my.topic.exchange";
private final String exch2 = "my.fanout.exchange";
private final Channel channel;
private final QueueingConsumer consumer;

public MultiExchangeReadTest(final String mqHost) throws Exception 

    // Connect to server
    System.out.println("Connecting to host " + mqHost);
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost(mqHost);
    Connection connection = factory.newConnection();
    channel = connection.createChannel();

    // Declare exchanges; use defaults for durable etc.
    channel.exchangeDeclare(exch1, "topic");
    channel.exchangeDeclare(exch2, "fanout");

    // Get a new, unique queue name
    final String queue = channel.queueDeclare().getQueue();

    // Bind the queue to the exchanges; topic gets non-empty routing key
    channel.queueBind(queue, exch1, "my.key");
    channel.queueBind(queue, exch2, "");

    // Configure the channel to fetch one message at a time, auto-ACK
    channel.basicQos(1);
    consumer = new QueueingConsumer(channel);
    channel.basicConsume(queue, true, consumer);


public void run() 
    // Reads messages until interrupted
    try 
        while (true) 
            // Wait for a message
            System.out.println("Awaiting message");
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            // Show contents using default encoding scheme
            String body = new String(delivery.getBody());
            System.out.println("Message from exch "
                    + delivery.getEnvelope().getExchange() + ", key '"
                    + delivery.getEnvelope().getRoutingKey() + "':\n"
                    + body);
         // while
     catch (Exception ex) 
        ex.printStackTrace();
    


public static void main(String[] args) throws Exception 
    if (args.length != 1) 
        System.err
                .println("Usaage: MultiExchangeReadTest.main mq-host-name");
     else 
        MultiExchangeReadTest multiReader = new MultiExchangeReadTest(
                args[0]);
        multiReader.run();
    


【讨论】:

以上是关于从 Java 客户端中的多个 RabbitMQ 交换中读取,没有轮询的主要内容,如果未能解决你的问题,请参考以下文章

一个队列上的多个消费者 RabbitMQ - Java

Go RabbitMQ主题

java如何获取rabbitmq队列中消息数量

Java 客户端中的 RabbitMQ 通道和线程

java rabbitmq 客户端在失去连接后怎么自动恢复

RabbitMQ发布订阅模式