RabbitMQ学习 (远程过程调用(RPC))

Posted missliu

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ学习 (远程过程调用(RPC))相关的知识,希望对你有一定的参考价值。

第二个教程中,我们学习了如何使用工作队列在多个工作人员之间分配耗时的任务。

但是如果我们需要在远程计算机上运行一个函数并等待结果呢?那么,这是一个不同的故事。这种模式通常被称为远程过程调用RPC

在本教程中,我们将使用RabbitMQ构建一个RPC系统:一个客户端和一个可扩展的RPC服务器。由于我们没有任何值得分发的耗时任务,我们将创建一个返回斐波那契数字的虚拟RPC服务。

 

客户端界面

为了说明如何使用RPC服务,我们将创建一个简单的客户端类。它将公开一个名为call的方法 ,它发送一个RPC请求并阻塞,直到收到答案:

var rpcClient = new RPCClient();

Console.WriteLine(" [x] Requesting fib(30)");
var response = rpcClient.Call("30");
Console.WriteLine(" [.] Got ‘{0}‘", response);

rpcClient.Close();

 

有关RPC的说明

尽管RPC在计算中是相当常见的模式,但它经常受到批评。程序员不知道函数调用是本地的还是慢的RPC时,就会出现问题。像这样的混乱会导致一个不可预知的系统,并增加调试的不必要的复杂性。而不是简化软件,滥用RPC会导致不可维护的意大利面代码。

考虑到以下建议:

  • 确定哪个函数调用是本地的,哪个是远程的。
  • 记录你的系统。清楚组件之间的依赖关系。
  • 处理错误情况。当RPC服务器长时间关闭时,客户端应该如何反应?

有疑问时避免RPC。如果可以的话,你应该使用一个异步管道 - 而不是像RPC一样的阻塞,结果被异步地推到下一个计算阶段。

回调队列

一般来说,使用RPC over RabbitMQ很容易。客户端发送请求消息,服务器回复响应消息。为了收到回应,我们需要发送一个“回叫”队列地址与请求:

var corrId = Guid.NewGuid().ToString();
var props = channel.CreateBasicProperties();
props.ReplyTo = replyQueueName;
props.CorrelationId = corrId;

var messageBytes = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "",
                     routingKey: "rpc_queue",
                     basicProperties: props,
                     body: messageBytes); //然后从callback_queue读取响应消息的代码

 

 

消息属性

AMQP 0-9-1协议预定义了一组与消息一起的14个属性。大多数属性很少使用,但以下情况除外:

  • deliveryMode:将消息标记为持久(值为2)或瞬态(任何其他值)。您可能会从第二个教程中记住这个属性
  • contentType:用于描述编码的MIME类型。例如对于经常使用的JSON编码,将此属性设置为application / json是一个好习惯
  • replyTo:通常用来命名一个回调队列。
  • correlationId:用于将RPC响应与请求关联起来。

相关标识

在上面介绍的方法中,我们建议为每个RPC请求创建一个回调队列。这是非常低效的,但幸运的是有一个更好的方法 - 让我们创建一个单一的客户端回调队列。

这引发了一个新的问题,在该队列中收到回复,不清楚回复属于哪个请求。那是什么时候使用correlationId属性。我们将把它设置为每个请求的唯一值。稍后,当我们在回调队列中收到一条消息时,我们将查看这个属性,并基于这个属性,我们可以将响应与请求进行匹配。如果我们看到一个未知的 correlationId值,我们可以放心地丢弃这个消息 - 这不属于我们的请求。

您可能会问,为什么我们应该忽略回调队列中的未知消息,而不是失败?这是由于在服务器端的竞争条件的可能性。虽然不太可能,但在发送给我们答案之后,但在发送请求的确认消息之前,RPC服务器可能会死亡。如果发生这种情况,重新启动的RPC服务器将再次处理该请求。这就是为什么在客户端,我们必须优雅地处理重复的响应,理想情况下RPC应该是幂等的。

概要

技术分享图片

我们的RPC将会像这样工作:

  • 当客户端启动时,它创建一个匿名排他回调队列。
  • 对于RPC请求,客户端会发送一个消息,其中包含两个属性: replyTo,它被设置为回调队列和correlationId,它被设置为每个请求的唯一值。
  • 该请求被发送到一个rpc_queue队列。
  • RPC worker(又名:服务器)正在等待该队列上的请求。当一个请求出现时,它执行这个工作,并使用replyTo字段中的队列将结果发送回客户端
  • 客户端在回调队列中等待数据。出现消息时,将检查correlationId属性。如果它匹配来自请求的值,则返回对应用程序的响应。

把它放在一起

斐波那契任务:

private static int fib(int n)
{
    if (n == 0 || n == 1) return n;
    return fib(n - 1) + fib(n - 2);
}

 

我们宣布我们的斐波那契函数。它只假定有效的正整数输入。(不要指望这个函数可以用于大数字,也可能是最慢的递归实现)。

我们的RPC服务器RPCServer.cs的代码如下所示:

 

using System;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;

class RPCServer
{
    public static void Main()
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using (var connection = factory.CreateConnection())
        using (var channel = connection.CreateModel())
        {
            channel.QueueDeclare(queue: "rpc_queue", durable: false,
              exclusive: false, autoDelete: false, arguments: null);
            channel.BasicQos(0, 1, false);
            var consumer = new EventingBasicConsumer(channel);
            channel.BasicConsume(queue: "rpc_queue",
              autoAck: false, consumer: consumer);
            Console.WriteLine(" [x] Awaiting RPC requests");

            consumer.Received += (model, ea) =>
            {
                string response = null;

                var body = ea.Body;
                var props = ea.BasicProperties;
                var replyProps = channel.CreateBasicProperties();
                replyProps.CorrelationId = props.CorrelationId;

                try
                {
                    var message = Encoding.UTF8.GetString(body);
                    int n = int.Parse(message);
                    Console.WriteLine(" [.] fib({0})", message);
                    response = fib(n).ToString();
                }
                catch (Exception e)
                {
                    Console.WriteLine(" [.] " + e.Message);
                    response = "";
                }
                finally
                {
                    var responseBytes = Encoding.UTF8.GetBytes(response);
                    channel.BasicPublish(exchange: "", routingKey: props.ReplyTo,
                      basicProperties: replyProps, body: responseBytes);
                    channel.BasicAck(deliveryTag: ea.DeliveryTag,
                      multiple: false);
                }
            };

            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }
    }

    /// 

    /// Assumes only valid positive integer input.
    /// Don‘t expect this one to work for big numbers, and it‘s
    /// probably the slowest recursive implementation possible.
    /// 

    private static int fib(int n)
    {
        if (n == 0 || n == 1)
        {
            return n;
        }

        return fib(n - 1) + fib(n - 2);
    }
}

服务器代码非常简单:

  • 像往常一样,我们首先建立连接,通道和声明队列。
  • 我们可能想要运行多个服务器进程。为了在多个服务器上平均分配负载,我们需要在channel.basicQos中设置 prefetchCount设置。
  • 我们使用basicConsume来访问队列。然后我们注册一个交付处理程序,在这个处理程序中我们完成这项工作,然后将回复发送回去

我们的RPC客户端RPCClient.cs的代码

 

using System;
using System.Collections.Concurrent;
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

public class RpcClient
{
    private readonly IConnection connection;
    private readonly IModel channel;
    private readonly string replyQueueName;
    private readonly EventingBasicConsumer consumer;
    private readonly BlockingCollection<string> respQueue = new BlockingCollection<string>();
    private readonly IBasicProperties props;

public RpcClient()
{
        var factory = new ConnectionFactory() { HostName = "localhost" };

        connection = factory.CreateConnection();
        channel = connection.CreateModel();
        replyQueueName = channel.QueueDeclare().QueueName;
        consumer = new EventingBasicConsumer(channel);

        props = channel.CreateBasicProperties();
        var correlationId = Guid.NewGuid().ToString();
        props.CorrelationId = correlationId;
        props.ReplyTo = replyQueueName;

        consumer.Received += (model, ea) =>
        {
            var body = ea.Body;
            var response = Encoding.UTF8.GetString(body);
            if (ea.BasicProperties.CorrelationId == correlationId)
            {
                respQueue.Add(response);
            }
        };
    }

    public string Call(string message)
    {
        var messageBytes = Encoding.UTF8.GetBytes(message);
        channel.BasicPublish(
            exchange: "",
            routingKey: "rpc_queue",
            basicProperties: props,
            body: messageBytes);

        channel.BasicConsume(
            consumer: consumer,
            queue: replyQueueName,
            autoAck: true);

        return respQueue.Take(); ;
    }

    public void Close()
    {
        connection.Close();
    }
}

public class Rpc
{
    public static void Main()
    {
        var rpcClient = new RpcClient();

        Console.WriteLine(" [x] Requesting fib(30)");
        var response = rpcClient.Call("30");

        Console.WriteLine(" [.] Got ‘{0}‘", response);
        rpcClient.Close();
    }
}

 

 

客户端代码稍微涉及一些:

  • 我们建立一个连接和通道,并为回复声明一个独占的“回叫”队列。
  • 我们订阅“回调”队列,以便我们可以接收RPC响应。
  • 我们的调用方法会产生实际的RPC请求。
  • 在这里,我们首先生成一个唯一的correlationId 数字并保存它 - while循环将使用这个值来捕获适当的响应。
  • 接下来,我们发布请求消息,具有两个属性: replyTocorrelationId
  • 在这一点上,我们可以坐下来等待,直到正确的答复到达。
  • while循环做的非常简单,每一个响应消息都会检查correlationId 是否是我们正在寻找的。如果是这样,它保存了响应。
  • 最后,我们将回复返回给用户。

提出客户要求:

 

 

var rpcClient = new RPCClient();

Console.WriteLine(" [x] Requesting fib(30)");
var response = rpcClient.Call("30");
Console.WriteLine(" [.] Got ‘{0}‘", response);

rpcClient.Close();

 

 

现在是查看RPCClient.csRPCServer.cs的完整示例源代码(包括基本的异常处理)的 时机

照常设置(参见教程一):

我们的RPC服务已经准备就绪。我们可以启动服务器:

 

 

cd RPCServer
dotnet run
# => [x] Awaiting RPC requests

 

  

要申请一个斐波那契数字运行客户端:

cd RPCClient
dotnet run
# => [x] Requesting fib(30)

  

这里介绍的设计不是RPC服务的唯一可能的实现,但它有一些重要的优点:

  • 如果RPC服务器速度太慢,可以通过运行另一个来扩展。尝试在新的控制台中运行第二个RPCServer
  • 在客户端,RPC需要发送和接收一条消息。不需要像queueDeclare这样的同步调用 。因此,RPC客户端只需要一次网络往返就可以获得一个RPC请求。

我们的代码仍然非常简单,并不试图解决更复杂(但重要)的问题,如:

  • 如果没有服务器运行,客户应该如何应对?
  • 客户端是否应该对RPC有某种超时?
  • 如果服务器发生故障并引发异常,是否应将其转发给客户端?
  • 在处理之前防止无效的传入消息(例如检查边界,类型)。

 

 

 

 

 

 

 




远程过程调用(RPC)

(使用.NET客户端)

 

先决条件

本教程假定RabbitMQ已安装在标准端口(5672上的本地主机运行如果您使用不同的主机,端口或凭据,连接设置将需要调整。

在哪里得到帮助

如果您在阅读本教程时遇到困难,可以 通过邮件列表与我们联系。

 

第二个教程中,我们学习了如何使用工作队列在多个工作人员之间分配耗时的任务。

但是如果我们需要在远程计算机上运行一个函数并等待结果呢?那么,这是一个不同的故事。这种模式通常被称为远程过程调用RPC

在本教程中,我们将使用RabbitMQ构建一个RPC系统:一个客户端和一个可扩展的RPC服务器。由于我们没有任何值得分发的耗时任务,我们将创建一个返回斐波那契数字的虚拟RPC服务。

客户端界面

为了说明如何使用RPC服务,我们将创建一个简单的客户端类。它将公开一个名为call的方法 ,它发送一个RPC请求并阻塞,直到收到答案:

var rpcClient = new RPCClient(); 

Console.WriteLine(“[x] Requesting fib(30)”);
var response = rpcClient.Call(“30”); 
Console.WriteLine(“[。]得到”{0}“,response); 

rpcClient.Close();

有关RPC的说明

尽管RPC在计算中是相当常见的模式,但它经常受到批评。当程序员不知道函数调用是本地的还是慢的RPC时,就会出现问题。像这样的混乱会导致一个不可预知的系统,并增加调试的不必要的复杂性。而不是简化软件,滥用RPC会导致不可维护的意大利面代码。

考虑到以下建议:

  • 确定哪个函数调用是本地的,哪个是远程的。
  • 记录你的系统。清楚组件之间的依赖关系。
  • 处理错误情况。当RPC服务器长时间关闭时,客户端应该如何反应?

有疑问时避免RPC。如果可以的话,你应该使用一个异步管道 - 而不是像RPC一样的阻塞,结果被异步地推到下一个计算阶段。

回调队列

一般来说,使用RPC over RabbitMQ很容易。客户端发送请求消息,服务器回复响应消息。为了收到回应,我们需要发送一个“回叫”队列地址与请求:

var corrId = Guid.NewGuid()。ToString();
var props = channel.CreateBasicProperties(); 
props.ReplyTo = replyQueueName; 
props.CorrelationId = corrId; var messageBytes = Encoding.UTF8.GetBytes(message); 
channel.BasicPublish(exchange:“”
                     routingKey:“rpc_queue”
                     basicProperties:props,
                     body:messageBytes); // ...然后从callback_queue读取响应消息的代码...




消息属性

AMQP 0-9-1协议预定义了一组与消息一起的14个属性。大多数属性很少使用,但以下情况除外:

  • deliveryMode:将消息标记为持久(值为2)或瞬态(任何其他值)。您可能会从第二个教程中记住这个属性
  • contentType:用于描述编码的MIME类型。例如对于经常使用的JSON编码,将此属性设置为application / json是一个好习惯
  • replyTo:通常用来命名一个回调队列。
  • correlationId:用于将RPC响应与请求关联起来。

相关标识

在上面介绍的方法中,我们建议为每个RPC请求创建一个回调队列。这是非常低效的,但幸运的是有一个更好的方法 - 让我们创建一个单一的客户端回调队列。

这引发了一个新的问题,在该队列中收到回复,不清楚回复属于哪个请求。那是什么时候使用correlationId属性。我们将把它设置为每个请求的唯一值。稍后,当我们在回调队列中收到一条消息时,我们将查看这个属性,并基于这个属性,我们可以将响应与请求进行匹配。如果我们看到一个未知的 correlationId值,我们可以放心地丢弃这个消息 - 这不属于我们的请求。

您可能会问,为什么我们应该忽略回调队列中的未知消息,而不是失败?这是由于在服务器端的竞争条件的可能性。虽然不太可能,但在发送给我们答案之后,但在发送请求的确认消息之前,RPC服务器可能会死亡。如果发生这种情况,重新启动的RPC服务器将再次处理该请求。这就是为什么在客户端,我们必须优雅地处理重复的响应,理想情况下RPC应该是幂等的。

概要

技术分享图片

我们的RPC将会像这样工作:

  • 当客户端启动时,它创建一个匿名排他回调队列。
  • 对于RPC请求,客户端会发送一个消息,其中包含两个属性: replyTo,它被设置为回调队列和correlationId,它被设置为每个请求的唯一值。
  • 该请求被发送到一个rpc_queue队列。
  • RPC worker(又名:服务器)正在等待该队列上的请求。当一个请求出现时,它执行这个工作,并使用replyTo字段中的队列将结果发送回客户端
  • 客户端在回调队列中等待数据。出现消息时,将检查correlationId属性。如果它匹配来自请求的值,则返回对应用程序的响应。

把它放在一起

斐波那契任务:

private  static  int  fibint n {
     if(n == 0 || n == 1 return n;
    return fib(n  - 1)+ fib(n  - 2); 
}

我们宣布我们的斐波那契函数。它只假定有效的正整数输入。(不要指望这个函数可以用于大数字,也可能是最慢的递归实现)。

我们的RPC服务器RPCServer.cs的代码如下所示:

使用系统;
使用 RabbitMQ.Client;
使用 RabbitMQ.Client.Events;
使用 System.Text; class RPCServer 
{ public static void Main

 
        {
         var factory = new ConnectionFactory(){HostName = “localhost” };
        使用VAR连接= factory.CreateConnection())
         使用VAR信道= connection.CreateModel())
        { 
            channel.QueueDeclare(队列:“rpc_queue” 耐用:
              独家:,自动删除:,自变量:); 
            channel.BasicQos(01);
            var consumer = new EventingBasicConsumer(channel); 
            channel.BasicConsume(队列:“rpc_queue”
              autoAck:false,consumer:consumer); 
            Console.WriteLine(“[x]等待RPC请求”); 

            consumer.Received + =(model,ea)=> 
            { string response = null ; var body = ea.Body;
                var props = ea.BasicProperties;
                var replyProps = channel.CreateBasicProperties();
                

                
                replyProps.CorrelationId = props.CorrelationId; 尝试 
                { var message = Encoding.UTF8.GetString(body);
                    int n = int .Parse(message); 
                    Console.WriteLine(“[。] fib({0})”,message); 
                    response = fib(n).ToString(); 
                } 捕获(例外五)
                { 
                    Console.WriteLine(“[]” + e.Message); 
                    response = “” ; 
                } 终于 
                { VAR

                
                    
                
                
                    responseBytes = Encoding.UTF8.GetBytes(response); 
                    channel.BasicPublish(exchange:“”,routingKey:props.ReplyTo,
                      basicProperties:replyProps,body:responseBytes); 
                    channel.BasicAck(deliveryTag:ea.DeliveryTag,
                      multiple:false); 
                } 
            }; 

            Console.WriteLine(“按[enter]退出”); 
            到Console.ReadLine(); 
        } 
    } ///

    
    ///只承担有效的正整数输入。
    ///不要指望这个函数可以用于大数字,而且
    ///可能是最慢的递归实现。
    /// 
    private  static  int  fibint n {
         if(n == 0 || n == 1
        { return n; 
        } return fib(n  - 1)+ fib(n  - 2); 
    }
}
            

        

服务器代码非常简单:

  • 像往常一样,我们首先建立连接,通道和声明队列。
  • 我们可能想要运行多个服务器进程。为了在多个服务器上平均分配负载,我们需要在channel.basicQos中设置 prefetchCount设置。
  • 我们使用basicConsume来访问队列。然后我们注册一个交付处理程序,在这个处理程序中我们完成这项工作,然后将回复发送回去

我们的RPC客户端RPCClient.cs的代码

以上是关于RabbitMQ学习 (远程过程调用(RPC))的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMQ 远程过程调用(RPC)

rabbitMQ_rpc

RabbitMQ九:远程过程调用RPC

#yyds干货盘点#RabbitMQ示例6:远程过程调用RPC

RabbitMQ中文文档PHP版本--远程过程调用(RPC)

RPC使用rabbitmq实现