Spring Boot 和异步 RabbitMQ RPC
Posted
技术标签:
【中文标题】Spring Boot 和异步 RabbitMQ RPC【英文标题】:Spring boot and asynchronous RabbitMQ RPC 【发布时间】:2021-08-25 22:23:26 【问题描述】:我正在尝试实现 RabbitMQ RPC 模式(请求/响应)。
这对我来说是全新的技术。所以我很难过。
这是一个网络应用,内置于 Spring Boot。
结构:
用户用一些信息填写表单并提交表单,该表单调用处理控制器,例如@/processUser
带有该信息的对象从表单发送到 RabbitMQ 队列
响应部分发生在其他spring项目服务中,获取请求,构建响应 并将其发送回去。
构建响应应该在给定的时间范围内对另一个 Spring 项目执行另一个用户,如果没有,则发回通用响应。
所以我假设响应代码,因为它需要在一个线程上等待请求的整个时间,我需要主线程来运行 Spring Boot 应用程序,应该在后台的单独线程上。因为我需要它是异步的。
这段代码按我的意愿“异步”工作,但我觉得有更好的方法,我只是不知道。我不知道这个匿名线程将如何处理使用网络应用程序的多个用户。它不需要完美,但可以接受:)
下面的代码还没有完成,没有做整个事情(发送对象,动态做出响应......)这只是测试阶段。
请求代码:
public String call(String message) throws Exception
final String corrID = UUID.randomUUID().toString();
String replayQueueName = channel.queueDeclare().getQueue();
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().correlationId(corrID).replyTo(replayQueueName).build();
channel.basicPublish("", requestQueueName,props,message.getBytes());
final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);
String ctag = channel.basicConsume(replayQueueName, true, (consumerTag, delivery) ->
if (delivery.getProperties().getCorrelationId().equals(corrID))
response.offer(new String(delivery.getBody(), "UTF-8"));
, consumerTag ->
);
String result = response.take();
channel.basicCancel(ctag);
return result;
并且这个方法在那个处理控制器中被调用:
try(Connection connection = factory.newConnection())
channel = connection.createChannel();
System.out.println("Sending request...");
String response = call("Test_Message");
System.out.println(response);
catch (Exception e)
e.printStackTrace();
响应代码:
@Bean
public ConnectionFactory startFactory()
return new ConnectionFactory();
@Bean
public Connection startCon(ConnectionFactory factory) throws Exception
return factory.newConnection();
@Bean
public void reciver()
new Thread(new Runnable()
@Override
public void run()
try
Channel channel = connection.createChannel();
channel.queueDeclare(RPC_QUEUE_NAME,false,false,false,null);
channel.queuePurge(RPC_QUEUE_NAME);
channel.basicQos(1);
System.out.println("Awaiting rpc requests");
Object monitor = new Object();
DeliverCallback deliverCallback = (consumerTag, delivery) ->
AMQP.BasicProperties replayProps = new AMQP.BasicProperties.Builder()
.correlationId(delivery.getProperties().getCorrelationId())
.build();
String response = "RESPONSE_TESTING";
String message = new String(delivery.getBody(),"UTF-8");
System.out.println(message);
channel.basicPublish("",delivery.getProperties().getReplyTo(), replayProps, response.getBytes());
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
synchronized (monitor)
monitor.notify();
;
channel.basicConsume(RPC_QUEUE_NAME, false, deliverCallback, (consumerTag -> ));
while(true)
synchronized (monitor)
try
monitor.wait();
catch (InterruptedException e)
e.printStackTrace();
catch (Exception e)
e.printStackTrace();
).start();
【问题讨论】:
【参考方案1】:我不太确定您想在这里实现什么,但鉴于您提供的代码,您似乎正在尝试重新发明***。由于您使用的是spring-boot
,因此无需创建能够运行异步以使用消息的整个基础架构。
Spring 已将其作为开箱即用的spring-amqp-starter
的一部分提供。例如,在这种情况下,您不必手动轮询队列以获取响应,因为此操作由已声明的 RabbitListener
处理。
我建议您阅读文档以及各种示例,以便更好地了解如何使用 Rabbit。您可以查看here 和here。
【讨论】:
是的;并且问题中的代码并不是真正的异步,因为它正在阻塞队列中等待;您可以简单地使用RabbitTemplate.sendAndReceive()
或RabbitTemplate.convertSendAndReceive()
。不清楚为什么用 spring-rabbit 标记它,因为您直接使用 amqp-client 并绕过所有 Spring 优点。以上是关于Spring Boot 和异步 RabbitMQ RPC的主要内容,如果未能解决你的问题,请参考以下文章