Spring Boot 和异步 RabbitMQ RPC

Posted

技术标签:

【中文标题】Spring Boot 和异步 RabbitMQ RPC【英文标题】:Spring boot and asynchronous RabbitMQ RPC 【发布时间】:2021-08-25 22:23:26 【问题描述】:

我正在尝试实现 RabbitMQ RPC 模式(请求/响应)。

这对我来说是全新的技术。所以我很难过。

这是一个网络应用,内置于 Spring Boot。

结构:

用户用一些信息填写表单并提交表单,该表单调用处理控制器,例如@/processUser

带有该信息的对象从表单发送到 RabbitMQ 队列

响应部分发生在其他spring项目服务中,获取请求,构建响应 并将其发送回去。

构建响应应该在给定的时间范围内对另一个 Spring 项目执行另一个用户,如果没有,则发回通用响应。

所以我假设响应代码,因为它需要在一个线程上等待请求的整个时间,我需要主线程来运行 Spring Boot 应用程序,应该在后台的单独线程上。因为我需要它是异步的。

这段代码按我的意愿“异步”工作,但我觉得有更好的方法,我只是不知道。我不知道这个匿名线程将如何处理使用网络应用程序的多个用户。它不需要完美,但可以接受:)

下面的代码还没有完成,没有做整个事情(发送对象,动态做出响应......)这只是测试阶段。

请求代码:

public  String call(String message) throws Exception
          final String corrID = UUID.randomUUID().toString();

        String replayQueueName = channel.queueDeclare().getQueue();
        AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().correlationId(corrID).replyTo(replayQueueName).build();

        channel.basicPublish("", requestQueueName,props,message.getBytes());

        final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);
        String ctag = channel.basicConsume(replayQueueName, true, (consumerTag, delivery) -> 
            if (delivery.getProperties().getCorrelationId().equals(corrID)) 
                response.offer(new String(delivery.getBody(), "UTF-8"));
            
        , consumerTag -> 
        );
        String result  = response.take();
        channel.basicCancel(ctag);
        return result;

并且这个方法在那个处理控制器中被调用:

try(Connection connection = factory.newConnection())
            channel = connection.createChannel();
            System.out.println("Sending request...");

            String response = call("Test_Message");
            System.out.println(response);

        catch (Exception e)
            e.printStackTrace();
        

响应代码:

@Bean
    public ConnectionFactory startFactory()
        return new ConnectionFactory();
    

    @Bean
    public Connection startCon(ConnectionFactory factory) throws Exception
        return  factory.newConnection();
    

    @Bean
    public void reciver()
        new Thread(new Runnable() 
            @Override
            public void run() 
                try

                    Channel channel = connection.createChannel();
                    channel.queueDeclare(RPC_QUEUE_NAME,false,false,false,null);
                    channel.queuePurge(RPC_QUEUE_NAME);

                    channel.basicQos(1);

                    System.out.println("Awaiting rpc requests");

                    Object monitor = new Object();
                    DeliverCallback deliverCallback = (consumerTag, delivery) ->
                        AMQP.BasicProperties replayProps = new AMQP.BasicProperties.Builder()
                                .correlationId(delivery.getProperties().getCorrelationId())
                                .build();
                        String response = "RESPONSE_TESTING";
                        String message = new String(delivery.getBody(),"UTF-8");
                        System.out.println(message);
                        channel.basicPublish("",delivery.getProperties().getReplyTo(), replayProps, response.getBytes());
                        channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                        synchronized (monitor)
                            monitor.notify();
                        

                    ;
                    channel.basicConsume(RPC_QUEUE_NAME, false, deliverCallback, (consumerTag -> ));
                    while(true)
                        synchronized (monitor)
                            try
                                monitor.wait();
                            catch (InterruptedException e)
                                e.printStackTrace();
                            
                        
                    
                catch (Exception e)
                    e.printStackTrace();
                

            
        ).start();



    

【问题讨论】:

【参考方案1】:

我不太确定您想在这里实现什么,但鉴于您提供的代码,您似乎正在尝试重新发明***。由于您使用的是spring-boot,因此无需创建能够运行异步以使用消息的整个基础架构。

Spring 已将其作为开箱即用的spring-amqp-starter 的一部分提供。例如,在这种情况下,您不必手动轮询队列以获取响应,因为此操作由已声明的 RabbitListener 处理。

我建议您阅读文档以及各种示例,以便更好地了解如何使用 Rabbit。您可以查看here 和here。

【讨论】:

是的;并且问题中的代码并不是真正的异步,因为它正在阻塞队列中等待;您可以简单地使用RabbitTemplate.sendAndReceive()RabbitTemplate.convertSendAndReceive()。不清楚为什么用 spring-rabbit 标记它,因为您直接使用 amqp-client 并绕过所有 Spring 优点。

以上是关于Spring Boot 和异步 RabbitMQ RPC的主要内容,如果未能解决你的问题,请参考以下文章

Spring Boot系列——7步集成RabbitMQ

Spring Boot (25) RabbitMQ消息队列

spring boot rabbitMQ 的 hello world

Spring Boot之RabbitMQ

详解Spring Boot中的RabbitMQ

Spring Boot(十三)RabbitMQ安装与集成