RabbitMQ 没有选择正确的消费者

Posted

技术标签:

【中文标题】RabbitMQ 没有选择正确的消费者【英文标题】:RabbitMQ doesn't choose a right consumer 【发布时间】:2018-08-06 11:34:11 【问题描述】:

我从这里http://www.rabbitmq.com/tutorials/tutorial-six-java.html 获取示例,从RPCClient 添加了一个更多的 RPC 调用,并添加了一些登录到标准输出。结果,当执行第二次调用时,rabbitmq 使用了具有错误相关 id 的消费者,这不是预期的行为。是错误还是我有什么问题?

RPC 服务器:

package com.foo.rabbitmq;

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Envelope;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class RPCServer 

  private static final String RPC_QUEUE_NAME = "sap-consume";

  private static int fib(int n) 
    if (n ==0) return 0;
    if (n == 1) return 1;
    return fib(n-1) + fib(n-2);
  

  public static void main(String[] argv) 
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    factory.setPort(5672);

    Connection connection = null;
    try 
      connection      = factory.newConnection();
      final Channel channel = connection.createChannel();

      channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);

      channel.basicQos(1);

      System.out.println(" [x] Awaiting RPC requests");

      Consumer consumer = new DefaultConsumer(channel) 
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException 
          AMQP.BasicProperties replyProps = new AMQP.BasicProperties
            .Builder()
            .correlationId(properties.getCorrelationId())
            .build();

          String response = "";

          try 
            String message = new String(body,"UTF-8");
            int n = Integer.parseInt(message);

            System.out.println(" [.] fib(" + message + ")");
            response += fib(n);
          
          catch (RuntimeException e)
            System.out.println(" [.] " + e.toString());
          
          finally 
            channel.basicPublish( "", properties.getReplyTo(), replyProps, response.getBytes("UTF-8"));
            channel.basicAck(envelope.getDeliveryTag(), false);
        // RabbitMq consumer worker thread notifies the RPC server owner thread
            synchronized(this) 
              this.notify();
            
          
        
      ;

      channel.basicConsume(RPC_QUEUE_NAME, false, consumer);
      // Wait and be prepared to consume the message from RPC client.
      while (true) 
        synchronized(consumer) 
          try 
            consumer.wait();
           catch (InterruptedException e) 
            e.printStackTrace();
          
        
      
     catch (IOException | TimeoutException e) 
      e.printStackTrace();
    
    finally 
      if (connection != null)
        try 
          connection.close();
         catch (IOException _ignore) 
    
  

RPC客户端:

package com.bar.rabbitmq;

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Envelope;

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeoutException;

public class RPCClient 

  private Connection connection;
  private Channel channel;
  private String requestQueueName = "sap-consume";
  private String replyQueueName;

  public RPCClient() throws IOException, TimeoutException 
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    factory.setPort(5672);

    connection = factory.newConnection();
    channel = connection.createChannel();

    replyQueueName = channel.queueDeclare().getQueue();
  

  public String call(String message) throws IOException, InterruptedException 
    final String corrId = UUID.randomUUID().toString();

    AMQP.BasicProperties props = new AMQP.BasicProperties
      .Builder()
      .correlationId(corrId)
      .replyTo(replyQueueName)
      .build();

    channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));

    final BlockingQueue<String> response = new ArrayBlockingQueue<String>(1);

    channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) 
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException 
        if (properties.getCorrelationId().equals(corrId)) 
          System.out.println("Correlation Id" + properties.getCorrelationId() + " corresponds to expected one.");
          response.offer(new String(body, "UTF-8"));
         else 
          System.out.println("Correlation Id" + properties.getCorrelationId() + " doesn't correspond to expected one " + corrId);
        
      
    );

    return response.take();
  

  public void close() throws IOException 
    connection.close();
  

  public static void main(String[] argv) 
    RPCClient rpc = null;
    String response = null;
    try 
      rpc = new RPCClient();

      System.out.println(" [x] Requesting fib(30)");
      response = rpc.call("30");
      System.out.println(" [.] Got '" + response + "'");
      System.out.println(" [x] Requesting fib(40)");
      response = rpc.call("40");
      System.out.println(" [.] Got '" + response + "'");
     catch (IOException | TimeoutException | InterruptedException e) 
      e.printStackTrace();
     finally 
      if (rpc != null) 
        try 
          rpc.close();
         catch (IOException _ignore) 
        
      
    
  

【问题讨论】:

【参考方案1】:

是的,您在教程代码中发现了一个错误。我已经打开了一个拉取请求来修复它,你也可以找到正在发生的事情的解释:

https://github.com/rabbitmq/rabbitmq-tutorials/pull/174


注意:RabbitMQ 团队会监控 the rabbitmq-users mailing list,并且有时只会回答 *** 上的问题。

【讨论】:

【参考方案2】:

这个例子很简单:它使用一个队列来回复。通过发送第二个请求,您为回复注册了一个新的消费者,但第一个请求的消费者仍在侦听,实际上窃取了第二个请求的响应。这就是为什么客户端似乎使用相同的关联 ID。

我们updated the client code 为每个请求使用独占的自动删除队列。该队列将被服务器自动删除,因为它的唯一消费者在收到响应后取消订阅。这有点复杂,但更接近真实世界的场景。

注意使用 RabbitMQ 处理回复队列的最佳方法是使用direct reply-to。这使用了比真实队列更轻的伪队列。我们没有在教程中提到直接回复以使其尽可能简单,但这是在生产中使用的首选功能。

【讨论】:

以上是关于RabbitMQ 没有选择正确的消费者的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMQ的六种工作模式

rabbitmq 注解配置使用(四)消息确认(消费)

RabbitMQ消息队列

RabbitMQ

RabbitMQ

一旦连接失败,Spring消费者没有连接到rabbitmq队列