在 ASP.Net Core 3 中使用 Rabbitmq 的 RPC

Posted

技术标签:

【中文标题】在 ASP.Net Core 3 中使用 Rabbitmq 的 RPC【英文标题】:RPC using Rabbitmq in ASP.Net Core 3 【发布时间】:2021-06-07 12:19:19 【问题描述】:

我想使用单个请求实现多个队列

RPC 服务器.cs

如何从服务器发送多条消息 例如。我创建了实例: `

  RPCServer rpcServer1 = new RPCServer();
         rpcServer1.PublishMessage("customerContactPersonsList","customerContactPersons");`

`

RPCServer rpcServer2 = new RPCServer();
     rpcServer2.PublishMessage("ProductInfoList", "projects");



  public void PublishMessage(string message, string rpcQueueName)
    
        var factory = new ConnectionFactory()  HostName = "localhost" ;
        using (var connection = factory.CreateConnection())
        using (var channel = connection.CreateModel())
        
            channel.QueueDeclare(queue: rpcQueueName, durable: false,
              exclusive: true, autoDelete: false, arguments: null);
            channel.BasicQos(0, 1, false);
            var consumer = new EventingBasicConsumer(channel);
            channel.BasicConsume(queue: rpcQueueName,
              autoAck: false, consumer: consumer);
            Console.WriteLine(" [x] Awaiting RPC requests");

            consumer.Received += (model, ea) =>
                
                    string response = null;

                    var body = ea.Body.ToArray();
                    var props = ea.BasicProperties;
                    var replyProps = channel.CreateBasicProperties();
                    replyProps.CorrelationId = props.CorrelationId;

                    try
                    
                        Console.WriteLine(" [.] Response message is)", message);
                        response = message;
                    
                    catch (Exception e)
                    
                        Console.WriteLine(" [.] " + e.Message);
                        response = "There was a error";
                    
                    finally
                    
                        var responseBytes = Encoding.UTF8.GetBytes(message);
                        channel.BasicPublish(exchange:"topic", routingKey: props.ReplyTo, basicProperties: replyProps, body: responseBytes);
                        channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                    
                ;

                Console.WriteLine("Press [enter] to exit.");
                Console.ReadLine();
            

接收器.cs ** 接收端是通过队列检索响应的地方,它还调用客户端,客户端基本上发送请求和队列名称作为参数,以从声明相同队列的服务器端获取 json 数据,并且当两个名称队列匹配响应被发送到接收端。**所以例如。这里的项目作为参数是给定的队列名称,服务器端也提到了相同的名称。

var rpcClient = new RpcClient();
            var customerContactPersons = await rpcClient.CallAsync("", "customerContactPersons");
            var response = await rpcClient.CallAsync("","projects");

var factory = new ConnectionFactory()  HostName = "localhost" ;

        connection = factory.CreateConnection();
        channel = connection.CreateModel();
        replyQueueName = channel.QueueDeclare().QueueName;
        consumer = new EventingBasicConsumer(channel);
        consumer.Received += (model, ea) =>
        
            if (!callbackMapper.TryRemove(ea.BasicProperties.CorrelationId, out TaskCompletionSource<string> tcs))
                return;
            var body = ea.Body.ToArray();
            var response = Encoding.UTF8.GetString(body);
            tcs.TrySetResult(response);
        ;
    

    public Task<string> CallAsync(string message, string rpcQueueName, CancellationToken cancellationToken = default(CancellationToken))
    
        IBasicProperties props = channel.CreateBasicProperties();
        var correlationId = Guid.NewGuid().ToString();
        props.CorrelationId = correlationId;
        props.ReplyTo = replyQueueName;
        var messageBytes = Encoding.UTF8.GetBytes(message);
        var tcs = new TaskCompletionSource<string>();
        callbackMapper.TryAdd(correlationId, tcs);

        channel.BasicPublish(
            exchange: "",
            routingKey: rpcQueueName,
            basicProperties: props,
            body: messageBytes);

        channel.BasicConsume(
            consumer: consumer,
            queue: replyQueueName,
            autoAck: true);

        cancellationToken.Register(() => callbackMapper.TryRemove(correlationId, out var tmp));
        return tcs.Task;
    `

【问题讨论】:

【参考方案1】:

我通过为每个队列添加连接块并在最后 console.readline();保持连接打开以供使用的队列。

using (var connection = rpcServer.PublishMessage(customercontact, "customercontact_rpc_queue"))
                using (var connectionObject = rpcServer.PublishMessage(result, "project_rpc_queue"))
                using (var customersObject = rpcServer.PublishMessage(customersFromByD, "customer_rpc_queue"))
                
                    Console.WriteLine("Press [enter] to exit.");
                    Console.ReadLine();
                

【讨论】:

以上是关于在 ASP.Net Core 3 中使用 Rabbitmq 的 RPC的主要内容,如果未能解决你的问题,请参考以下文章

将 OpenID Connect 与 .NET Core 3.0 ASP.NET Core API 服务一起使用时出错

Linq 查询在 ASP.NET-Core 3.0 及更高版本中对数字等字符串进行排序

如何在 ASP.NET Core 3.1 中启用多重身份验证?

属性路由在 asp.net core 3.0 中无法正常工作

使用 ASP.NET Core Identity 3 的用户角色权限

在 ASP.NET Core 3 中使用 id_token