RabbitMQ RPC 以异步方式?

Posted

技术标签:

【中文标题】RabbitMQ RPC 以异步方式?【英文标题】:RabbitMQ RPC in an async way? 【发布时间】:2020-09-04 07:08:38 【问题描述】:

我正在为一个使用 RabbitMQ 和 RPC 的客户开发一个项目,我不太了解 RabbitMQ,我正在努力在互联网上找到一些不错的示例。我需要实现一些异步操作,我会更好地解释自己..

在当前状态下,我有一个生产者发送 RPC 请求并等待消费者的答复,到目前为止一切正常,一切正常。我的问题是我不想等待答案,我仍然需要答案,但我不想在我的制片人那里等待。我将在此处发布我的生产者和消费者代码。

制作人

using System;
using System.Collections.Concurrent;
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace RabbitProducer

    public class RpcClient
    
        private readonly IConnection connection;
        private readonly IModel channel;
        private readonly string replyQueueName;
        private readonly EventingBasicConsumer consumer;
        private readonly BlockingCollection<string> respQueue = new BlockingCollection<string>();
        private readonly IBasicProperties props;

        public RpcClient()
        
            ConnectionFactory factory = new ConnectionFactory()  HostName = "192.168.68.17" ;

            connection = factory.CreateConnection();
            channel = connection.CreateModel();
            channel.ConfirmSelect();
            replyQueueName = channel.QueueDeclare().QueueName;
            consumer = new EventingBasicConsumer(channel);

            props = channel.CreateBasicProperties();
            string correlationId = Guid.NewGuid().ToString();
            props.CorrelationId = correlationId;
            props.ReplyTo = replyQueueName;

            consumer.Received += (model, ea) =>
            
                var body = ea.Body.ToArray();
                string response = Encoding.UTF8.GetString(body);
                if (ea.BasicProperties.CorrelationId == correlationId)
                
                    respQueue.Add(response);
                
            ;
            channel.BasicAcks += (sender, ea) =>
            

            ;
            channel.BasicNacks += (sender, ea) =>
            

            ;
        

        public string Call(string message)
        
            var messageBytes = Encoding.UTF8.GetBytes(message);

            channel.BasicPublish(
                exchange: "",
                routingKey: "Ciccio",
                basicProperties: props,
                body: messageBytes);

            channel.BasicConsume(
                consumer: consumer,
                queue: replyQueueName,
                autoAck: true);

            return respQueue.Take();
        

        public void Close()
        
            connection.Close();
        
    
    class Program
    
        public static void Main()
        
            RpcClient rpcClient = new RpcClient();
            Random random = new Random();
            int a = random.Next(10, 50);
            Console.WriteLine("Ciccio");
            Console.WriteLine(a.ToString());
            string response = rpcClient.Call(a.ToString());

            Console.WriteLine(" [.] Got '0'", response);
            rpcClient.Close();
            Console.ReadLine();
        
    

消费者

using System;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
using System.Threading;
namespace RabbitConsumer

    class Program
    
        public static void Main()
        
            var factory = new ConnectionFactory()  HostName = "192.168.68.17" ;
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            

                channel.QueueDeclare(queue: "Ciccio", durable: false,
                  exclusive: false, autoDelete: false, arguments: null);
                channel.BasicQos(0, 1, false);
                var consumer = new EventingBasicConsumer(channel);
                channel.BasicConsume(queue: "Ciccio",
                  autoAck: false, consumer: consumer);
                Console.WriteLine("Ciccio");
                Console.WriteLine(" [x] Awaiting RPC requests");

                consumer.Received += (model, ea) =>
                
                    string response = null;
                    System.Threading.Thread.Sleep(5000);
                    var body = ea.Body.ToArray();
                    var props = ea.BasicProperties;
                    var replyProps = channel.CreateBasicProperties();
                    replyProps.CorrelationId = props.CorrelationId;

                    try
                    
                        var message = Encoding.UTF8.GetString(body);
                        int n = int.Parse(message);
                        Console.WriteLine(" [.] fib(0)", message);
                        response = fib(n).ToString();
                    
                    catch (Exception e)
                    
                        Console.WriteLine(" [.] " + e.Message);
                        response = "";
                    
                    finally
                    
                        var responseBytes = Encoding.UTF8.GetBytes(response);
                        channel.BasicPublish(exchange: "", routingKey: props.ReplyTo,
                          basicProperties: replyProps, body: responseBytes);
                        channel.BasicAck(deliveryTag: ea.DeliveryTag,
                          multiple: false);
                    
                ;

                Console.WriteLine(" Press [enter] to exit.");
                Console.ReadLine();
            
        


        private static int fib(int n)
        
            if (n == 0 || n == 1)
            
                return n;
            

            return fib(n - 1) + fib(n - 2);
        

    

【问题讨论】:

【参考方案1】:

好吧,显然我最近工作太多以至于忘记了正确使用任务......这就是你可以实现我想要的东西的方法。

异步生产者

using System;
using System.Collections.Concurrent;
using System.Text;
using System.Threading.Tasks;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace RabbitProducer

    public class RpcClient
    
        private readonly IConnection connection;
        private readonly IModel channel;
        private readonly string replyQueueName;
        private readonly EventingBasicConsumer consumer;
        private readonly BlockingCollection<string> respQueue = new BlockingCollection<string>();
        private readonly IBasicProperties props;

        public RpcClient()
        
            ConnectionFactory factory = new ConnectionFactory()  HostName = "192.168.68.17" ;

            connection = factory.CreateConnection();
            channel = connection.CreateModel();
            channel.ConfirmSelect();
            replyQueueName = channel.QueueDeclare().QueueName;
            consumer = new EventingBasicConsumer(channel);

            props = channel.CreateBasicProperties();
            string correlationId = Guid.NewGuid().ToString();
            props.CorrelationId = correlationId;
            props.ReplyTo = replyQueueName;

            consumer.Received += (model, ea) =>
            
                var body = ea.Body.ToArray();
                string response = Encoding.UTF8.GetString(body);
                if (ea.BasicProperties.CorrelationId == correlationId)
                
                    respQueue.Add(response);

                
            ;
            channel.BasicAcks += (sender, ea) =>
            

            ;
            channel.BasicNacks += (sender, ea) =>
            

            ;
        

        public string Call(string message)
        
            var messageBytes = Encoding.UTF8.GetBytes(message);

            channel.BasicPublish(
                exchange: "",
                routingKey: "Ciccio",
                basicProperties: props,
                body: messageBytes);

            channel.BasicConsume(
                consumer: consumer,
                queue: replyQueueName,
                autoAck: true);
            return respQueue.Take();            
        

        public void Close()
        
            connection.Close();
        
    
    class Program
    
        private static void Send(int n)
        
            Console.WriteLine("SEND " + n);
            Task taskA = new Task(() =>
            
                RpcClient rpcClient = new RpcClient();
                string resp = rpcClient.Call(n.ToString());

                Console.WriteLine(n + " [.] Got '0'", resp);
                rpcClient.Close();
            );
            taskA.Start();
        

        public static void Main()
        
            Random random = new Random();
            int a = random.Next(10, 50);
            Send(a);
            int b = random.Next(10, 50);
            Send(b);
            int c = random.Next(10, 50);
            Send(c);
            Console.ReadLine();
        
    

【讨论】:

以上是关于RabbitMQ RPC 以异步方式?的主要内容,如果未能解决你的问题,请参考以下文章

rabbit入门教程

使用 EventMachine 和 RabbitMQ 的 RPC

Python开发项目:RPC异步执行命令(RabbitMQ双向通信)

基于RabbitMQ RPC实现的主机异步管理

python项目开发:用RabbitMQ实现异步RPC

Spring Boot 和异步 RabbitMQ RPC