RabbitMQ 没有选择正确的消费者
Posted
技术标签:
【中文标题】RabbitMQ 没有选择正确的消费者【英文标题】:RabbitMQ doesn't choose a right consumer 【发布时间】:2018-08-06 11:34:11 【问题描述】:我从这里http://www.rabbitmq.com/tutorials/tutorial-six-java.html 获取示例,从RPCClient
添加了一个更多的 RPC 调用,并添加了一些登录到标准输出。结果,当执行第二次调用时,rabbitmq 使用了具有错误相关 id 的消费者,这不是预期的行为。是错误还是我有什么问题?
RPC 服务器:
package com.foo.rabbitmq;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class RPCServer
private static final String RPC_QUEUE_NAME = "sap-consume";
private static int fib(int n)
if (n ==0) return 0;
if (n == 1) return 1;
return fib(n-1) + fib(n-2);
public static void main(String[] argv)
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
Connection connection = null;
try
connection = factory.newConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
channel.basicQos(1);
System.out.println(" [x] Awaiting RPC requests");
Consumer consumer = new DefaultConsumer(channel)
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException
AMQP.BasicProperties replyProps = new AMQP.BasicProperties
.Builder()
.correlationId(properties.getCorrelationId())
.build();
String response = "";
try
String message = new String(body,"UTF-8");
int n = Integer.parseInt(message);
System.out.println(" [.] fib(" + message + ")");
response += fib(n);
catch (RuntimeException e)
System.out.println(" [.] " + e.toString());
finally
channel.basicPublish( "", properties.getReplyTo(), replyProps, response.getBytes("UTF-8"));
channel.basicAck(envelope.getDeliveryTag(), false);
// RabbitMq consumer worker thread notifies the RPC server owner thread
synchronized(this)
this.notify();
;
channel.basicConsume(RPC_QUEUE_NAME, false, consumer);
// Wait and be prepared to consume the message from RPC client.
while (true)
synchronized(consumer)
try
consumer.wait();
catch (InterruptedException e)
e.printStackTrace();
catch (IOException | TimeoutException e)
e.printStackTrace();
finally
if (connection != null)
try
connection.close();
catch (IOException _ignore)
RPC客户端:
package com.bar.rabbitmq;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeoutException;
public class RPCClient
private Connection connection;
private Channel channel;
private String requestQueueName = "sap-consume";
private String replyQueueName;
public RPCClient() throws IOException, TimeoutException
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
connection = factory.newConnection();
channel = connection.createChannel();
replyQueueName = channel.queueDeclare().getQueue();
public String call(String message) throws IOException, InterruptedException
final String corrId = UUID.randomUUID().toString();
AMQP.BasicProperties props = new AMQP.BasicProperties
.Builder()
.correlationId(corrId)
.replyTo(replyQueueName)
.build();
channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));
final BlockingQueue<String> response = new ArrayBlockingQueue<String>(1);
channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel)
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException
if (properties.getCorrelationId().equals(corrId))
System.out.println("Correlation Id" + properties.getCorrelationId() + " corresponds to expected one.");
response.offer(new String(body, "UTF-8"));
else
System.out.println("Correlation Id" + properties.getCorrelationId() + " doesn't correspond to expected one " + corrId);
);
return response.take();
public void close() throws IOException
connection.close();
public static void main(String[] argv)
RPCClient rpc = null;
String response = null;
try
rpc = new RPCClient();
System.out.println(" [x] Requesting fib(30)");
response = rpc.call("30");
System.out.println(" [.] Got '" + response + "'");
System.out.println(" [x] Requesting fib(40)");
response = rpc.call("40");
System.out.println(" [.] Got '" + response + "'");
catch (IOException | TimeoutException | InterruptedException e)
e.printStackTrace();
finally
if (rpc != null)
try
rpc.close();
catch (IOException _ignore)
【问题讨论】:
【参考方案1】:是的,您在教程代码中发现了一个错误。我已经打开了一个拉取请求来修复它,你也可以找到正在发生的事情的解释:
https://github.com/rabbitmq/rabbitmq-tutorials/pull/174
注意:RabbitMQ 团队会监控 the rabbitmq-users
mailing list,并且有时只会回答 *** 上的问题。
【讨论】:
【参考方案2】:这个例子很简单:它使用一个队列来回复。通过发送第二个请求,您为回复注册了一个新的消费者,但第一个请求的消费者仍在侦听,实际上窃取了第二个请求的响应。这就是为什么客户端似乎使用相同的关联 ID。
我们updated the client code 为每个请求使用独占的自动删除队列。该队列将被服务器自动删除,因为它的唯一消费者在收到响应后取消订阅。这有点复杂,但更接近真实世界的场景。
注意使用 RabbitMQ 处理回复队列的最佳方法是使用direct reply-to。这使用了比真实队列更轻的伪队列。我们没有在教程中提到直接回复以使其尽可能简单,但这是在生产中使用的首选功能。
【讨论】:
以上是关于RabbitMQ 没有选择正确的消费者的主要内容,如果未能解决你的问题,请参考以下文章