在第二个教程中,我们学习了如何使用工作队列在多个工作人员之间分配耗时的任务。
但是如果我们需要在远程计算机上运行一个函数并等待结果呢?那么,这是一个不同的故事。这种模式通常被称为远程过程调用或RPC。
在本教程中,我们将使用RabbitMQ构建一个RPC系统:一个客户端和一个可扩展的RPC服务器。由于我们没有任何值得分发的耗时任务,我们将创建一个返回斐波那契数字的虚拟RPC服务。
客户端界面
为了说明如何使用RPC服务,我们将创建一个简单的客户端类。它将公开一个名为call的方法 ,它发送一个RPC请求并阻塞,直到收到答案:
var rpcClient = new RPCClient(); Console.WriteLine(" [x] Requesting fib(30)"); var response = rpcClient.Call("30"); Console.WriteLine(" [.] Got ‘{0}‘", response); rpcClient.Close();
有关RPC的说明
尽管RPC在计算中是相当常见的模式,但它经常受到批评。当程序员不知道函数调用是本地的还是慢的RPC时,就会出现问题。像这样的混乱会导致一个不可预知的系统,并增加调试的不必要的复杂性。而不是简化软件,滥用RPC会导致不可维护的意大利面代码。
考虑到以下建议:
- 确定哪个函数调用是本地的,哪个是远程的。
- 记录你的系统。清楚组件之间的依赖关系。
- 处理错误情况。当RPC服务器长时间关闭时,客户端应该如何反应?
有疑问时避免RPC。如果可以的话,你应该使用一个异步管道 - 而不是像RPC一样的阻塞,结果被异步地推到下一个计算阶段。
回调队列
一般来说,使用RPC over RabbitMQ很容易。客户端发送请求消息,服务器回复响应消息。为了收到回应,我们需要发送一个“回叫”队列地址与请求:
var corrId = Guid.NewGuid().ToString();
var props = channel.CreateBasicProperties();
props.ReplyTo = replyQueueName;
props.CorrelationId = corrId;
var messageBytes = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "",
routingKey: "rpc_queue",
basicProperties: props,
body: messageBytes); //然后从callback_queue读取响应消息的代码
消息属性
AMQP 0-9-1协议预定义了一组与消息一起的14个属性。大多数属性很少使用,但以下情况除外:
- deliveryMode:将消息标记为持久(值为2)或瞬态(任何其他值)。您可能会从第二个教程中记住这个属性。
- contentType:用于描述编码的MIME类型。例如对于经常使用的JSON编码,将此属性设置为application / json是一个好习惯。
- replyTo:通常用来命名一个回调队列。
- correlationId:用于将RPC响应与请求关联起来。
相关标识
在上面介绍的方法中,我们建议为每个RPC请求创建一个回调队列。这是非常低效的,但幸运的是有一个更好的方法 - 让我们创建一个单一的客户端回调队列。
这引发了一个新的问题,在该队列中收到回复,不清楚回复属于哪个请求。那是什么时候使用correlationId属性。我们将把它设置为每个请求的唯一值。稍后,当我们在回调队列中收到一条消息时,我们将查看这个属性,并基于这个属性,我们可以将响应与请求进行匹配。如果我们看到一个未知的 correlationId值,我们可以放心地丢弃这个消息 - 这不属于我们的请求。
您可能会问,为什么我们应该忽略回调队列中的未知消息,而不是失败?这是由于在服务器端的竞争条件的可能性。虽然不太可能,但在发送给我们答案之后,但在发送请求的确认消息之前,RPC服务器可能会死亡。如果发生这种情况,重新启动的RPC服务器将再次处理该请求。这就是为什么在客户端,我们必须优雅地处理重复的响应,理想情况下RPC应该是幂等的。
概要
我们的RPC将会像这样工作:
- 当客户端启动时,它创建一个匿名排他回调队列。
- 对于RPC请求,客户端会发送一个消息,其中包含两个属性: replyTo,它被设置为回调队列和correlationId,它被设置为每个请求的唯一值。
- 该请求被发送到一个rpc_queue队列。
- RPC worker(又名:服务器)正在等待该队列上的请求。当一个请求出现时,它执行这个工作,并使用replyTo字段中的队列将结果发送回客户端。
- 客户端在回调队列中等待数据。出现消息时,将检查correlationId属性。如果它匹配来自请求的值,则返回对应用程序的响应。
把它放在一起
斐波那契任务:
private static int fib(int n) { if (n == 0 || n == 1) return n; return fib(n - 1) + fib(n - 2); }
我们宣布我们的斐波那契函数。它只假定有效的正整数输入。(不要指望这个函数可以用于大数字,也可能是最慢的递归实现)。
我们的RPC服务器RPCServer.cs的代码如下所示:
using System; using RabbitMQ.Client; using RabbitMQ.Client.Events; using System.Text; class RPCServer { public static void Main() { var factory = new ConnectionFactory() { HostName = "localhost" }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.QueueDeclare(queue: "rpc_queue", durable: false, exclusive: false, autoDelete: false, arguments: null); channel.BasicQos(0, 1, false); var consumer = new EventingBasicConsumer(channel); channel.BasicConsume(queue: "rpc_queue", autoAck: false, consumer: consumer); Console.WriteLine(" [x] Awaiting RPC requests"); consumer.Received += (model, ea) => { string response = null; var body = ea.Body; var props = ea.BasicProperties; var replyProps = channel.CreateBasicProperties(); replyProps.CorrelationId = props.CorrelationId; try { var message = Encoding.UTF8.GetString(body); int n = int.Parse(message); Console.WriteLine(" [.] fib({0})", message); response = fib(n).ToString(); } catch (Exception e) { Console.WriteLine(" [.] " + e.Message); response = ""; } finally { var responseBytes = Encoding.UTF8.GetBytes(response); channel.BasicPublish(exchange: "", routingKey: props.ReplyTo, basicProperties: replyProps, body: responseBytes); channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); } }; Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } /// /// Assumes only valid positive integer input. /// Don‘t expect this one to work for big numbers, and it‘s /// probably the slowest recursive implementation possible. /// private static int fib(int n) { if (n == 0 || n == 1) { return n; } return fib(n - 1) + fib(n - 2); } }
服务器代码非常简单:
- 像往常一样,我们首先建立连接,通道和声明队列。
- 我们可能想要运行多个服务器进程。为了在多个服务器上平均分配负载,我们需要在channel.basicQos中设置 prefetchCount设置。
- 我们使用basicConsume来访问队列。然后我们注册一个交付处理程序,在这个处理程序中我们完成这项工作,然后将回复发送回去
我们的RPC客户端RPCClient.cs的代码:
using System; using System.Collections.Concurrent; using System.Text; using RabbitMQ.Client; using RabbitMQ.Client.Events; public class RpcClient { private readonly IConnection connection; private readonly IModel channel; private readonly string replyQueueName; private readonly EventingBasicConsumer consumer; private readonly BlockingCollection<string> respQueue = new BlockingCollection<string>(); private readonly IBasicProperties props; public RpcClient() { var factory = new ConnectionFactory() { HostName = "localhost" }; connection = factory.CreateConnection(); channel = connection.CreateModel(); replyQueueName = channel.QueueDeclare().QueueName; consumer = new EventingBasicConsumer(channel); props = channel.CreateBasicProperties(); var correlationId = Guid.NewGuid().ToString(); props.CorrelationId = correlationId; props.ReplyTo = replyQueueName; consumer.Received += (model, ea) => { var body = ea.Body; var response = Encoding.UTF8.GetString(body); if (ea.BasicProperties.CorrelationId == correlationId) { respQueue.Add(response); } }; } public string Call(string message) { var messageBytes = Encoding.UTF8.GetBytes(message); channel.BasicPublish( exchange: "", routingKey: "rpc_queue", basicProperties: props, body: messageBytes); channel.BasicConsume( consumer: consumer, queue: replyQueueName, autoAck: true); return respQueue.Take(); ; } public void Close() { connection.Close(); } } public class Rpc { public static void Main() { var rpcClient = new RpcClient(); Console.WriteLine(" [x] Requesting fib(30)"); var response = rpcClient.Call("30"); Console.WriteLine(" [.] Got ‘{0}‘", response); rpcClient.Close(); } }
客户端代码稍微涉及一些:
- 我们建立一个连接和通道,并为回复声明一个独占的“回叫”队列。
- 我们订阅“回调”队列,以便我们可以接收RPC响应。
- 我们的调用方法会产生实际的RPC请求。
- 在这里,我们首先生成一个唯一的correlationId 数字并保存它 - while循环将使用这个值来捕获适当的响应。
- 接下来,我们发布请求消息,具有两个属性: replyTo和correlationId。
- 在这一点上,我们可以坐下来等待,直到正确的答复到达。
- while循环做的非常简单,每一个响应消息都会检查correlationId 是否是我们正在寻找的。如果是这样,它保存了响应。
- 最后,我们将回复返回给用户。
提出客户要求:
var rpcClient = new RPCClient(); Console.WriteLine(" [x] Requesting fib(30)"); var response = rpcClient.Call("30"); Console.WriteLine(" [.] Got ‘{0}‘", response); rpcClient.Close();
现在是查看RPCClient.cs和RPCServer.cs的完整示例源代码(包括基本的异常处理)的 好时机。
照常设置(参见教程一):
我们的RPC服务已经准备就绪。我们可以启动服务器:
cd RPCServer dotnet run # => [x] Awaiting RPC requests
要申请一个斐波那契数字运行客户端:
cd RPCClient dotnet run # => [x] Requesting fib(30)
这里介绍的设计不是RPC服务的唯一可能的实现,但它有一些重要的优点:
- 如果RPC服务器速度太慢,可以通过运行另一个来扩展。尝试在新的控制台中运行第二个RPCServer。
- 在客户端,RPC需要发送和接收一条消息。不需要像queueDeclare这样的同步调用 。因此,RPC客户端只需要一次网络往返就可以获得一个RPC请求。
我们的代码仍然非常简单,并不试图解决更复杂(但重要)的问题,如:
- 如果没有服务器运行,客户应该如何应对?
- 客户端是否应该对RPC有某种超时?
- 如果服务器发生故障并引发异常,是否应将其转发给客户端?
- 在处理之前防止无效的传入消息(例如检查边界,类型)。
远程过程调用(RPC)
(使用.NET客户端)
先决条件
本教程假定RabbitMQ已安装并在标准端口(5672)上的本地主机上运行。如果您使用不同的主机,端口或凭据,连接设置将需要调整。
在哪里得到帮助
如果您在阅读本教程时遇到困难,可以 通过邮件列表与我们联系。
在第二个教程中,我们学习了如何使用工作队列在多个工作人员之间分配耗时的任务。
但是如果我们需要在远程计算机上运行一个函数并等待结果呢?那么,这是一个不同的故事。这种模式通常被称为远程过程调用或RPC。
在本教程中,我们将使用RabbitMQ构建一个RPC系统:一个客户端和一个可扩展的RPC服务器。由于我们没有任何值得分发的耗时任务,我们将创建一个返回斐波那契数字的虚拟RPC服务。
客户端界面
为了说明如何使用RPC服务,我们将创建一个简单的客户端类。它将公开一个名为call的方法 ,它发送一个RPC请求并阻塞,直到收到答案:
var rpcClient = new RPCClient();
Console.WriteLine(“[x] Requesting fib(30)”);
var response = rpcClient.Call(“30”);
Console.WriteLine(“[。]得到”{0}“,response);
rpcClient.Close();
有关RPC的说明
尽管RPC在计算中是相当常见的模式,但它经常受到批评。当程序员不知道函数调用是本地的还是慢的RPC时,就会出现问题。像这样的混乱会导致一个不可预知的系统,并增加调试的不必要的复杂性。而不是简化软件,滥用RPC会导致不可维护的意大利面代码。
考虑到以下建议:
- 确定哪个函数调用是本地的,哪个是远程的。
- 记录你的系统。清楚组件之间的依赖关系。
- 处理错误情况。当RPC服务器长时间关闭时,客户端应该如何反应?
有疑问时避免RPC。如果可以的话,你应该使用一个异步管道 - 而不是像RPC一样的阻塞,结果被异步地推到下一个计算阶段。
回调队列
一般来说,使用RPC over RabbitMQ很容易。客户端发送请求消息,服务器回复响应消息。为了收到回应,我们需要发送一个“回叫”队列地址与请求:
var corrId = Guid.NewGuid()。ToString();
var props = channel.CreateBasicProperties();
props.ReplyTo = replyQueueName;
props.CorrelationId = corrId; var messageBytes = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange:“”,
routingKey:“rpc_queue”,
basicProperties:props,
body:messageBytes); // ...然后从callback_queue读取响应消息的代码...
消息属性
AMQP 0-9-1协议预定义了一组与消息一起的14个属性。大多数属性很少使用,但以下情况除外:
- deliveryMode:将消息标记为持久(值为2)或瞬态(任何其他值)。您可能会从第二个教程中记住这个属性。
- contentType:用于描述编码的MIME类型。例如对于经常使用的JSON编码,将此属性设置为application / json是一个好习惯。
- replyTo:通常用来命名一个回调队列。
- correlationId:用于将RPC响应与请求关联起来。
相关标识
在上面介绍的方法中,我们建议为每个RPC请求创建一个回调队列。这是非常低效的,但幸运的是有一个更好的方法 - 让我们创建一个单一的客户端回调队列。
这引发了一个新的问题,在该队列中收到回复,不清楚回复属于哪个请求。那是什么时候使用correlationId属性。我们将把它设置为每个请求的唯一值。稍后,当我们在回调队列中收到一条消息时,我们将查看这个属性,并基于这个属性,我们可以将响应与请求进行匹配。如果我们看到一个未知的 correlationId值,我们可以放心地丢弃这个消息 - 这不属于我们的请求。
您可能会问,为什么我们应该忽略回调队列中的未知消息,而不是失败?这是由于在服务器端的竞争条件的可能性。虽然不太可能,但在发送给我们答案之后,但在发送请求的确认消息之前,RPC服务器可能会死亡。如果发生这种情况,重新启动的RPC服务器将再次处理该请求。这就是为什么在客户端,我们必须优雅地处理重复的响应,理想情况下RPC应该是幂等的。
概要
我们的RPC将会像这样工作:
- 当客户端启动时,它创建一个匿名排他回调队列。
- 对于RPC请求,客户端会发送一个消息,其中包含两个属性: replyTo,它被设置为回调队列和correlationId,它被设置为每个请求的唯一值。
- 该请求被发送到一个rpc_queue队列。
- RPC worker(又名:服务器)正在等待该队列上的请求。当一个请求出现时,它执行这个工作,并使用replyTo字段中的队列将结果发送回客户端。
- 客户端在回调队列中等待数据。出现消息时,将检查correlationId属性。如果它匹配来自请求的值,则返回对应用程序的响应。
把它放在一起
斐波那契任务:
private static int fib(int n)
{
if(n == 0 || n == 1) return n;
return fib(n - 1)+ fib(n - 2);
}
我们宣布我们的斐波那契函数。它只假定有效的正整数输入。(不要指望这个函数可以用于大数字,也可能是最慢的递归实现)。
我们的RPC服务器RPCServer.cs的代码如下所示:
使用系统;
使用 RabbitMQ.Client;
使用 RabbitMQ.Client.Events;
使用 System.Text; class RPCServer
{ public static void Main(
)
{
var factory = new ConnectionFactory(){HostName = “localhost” };
使用(VAR连接= factory.CreateConnection())
使用(VAR信道= connection.CreateModel())
{
channel.QueueDeclare(队列:“rpc_queue” ,耐用:假,
独家:假,自动删除:假,自变量:空);
channel.BasicQos(0,1,假);
var consumer = new EventingBasicConsumer(channel);
channel.BasicConsume(队列:“rpc_queue”,
autoAck:false,consumer:consumer);
Console.WriteLine(“[x]等待RPC请求”);
consumer.Received + =(model,ea)=>
{ string response = null ; var body = ea.Body;
var props = ea.BasicProperties;
var replyProps = channel.CreateBasicProperties();
replyProps.CorrelationId = props.CorrelationId; 尝试
{ var message = Encoding.UTF8.GetString(body);
int n = int .Parse(message);
Console.WriteLine(“[。] fib({0})”,message);
response = fib(n).ToString();
} 捕获(例外五)
{
Console.WriteLine(“[]” + e.Message);
response = “” ;
} 终于
{ VAR
responseBytes = Encoding.UTF8.GetBytes(response);
channel.BasicPublish(exchange:“”,routingKey:props.ReplyTo,
basicProperties:replyProps,body:responseBytes);
channel.BasicAck(deliveryTag:ea.DeliveryTag,
multiple:false);
}
};
Console.WriteLine(“按[enter]退出”);
到Console.ReadLine();
}
} ///
///只承担有效的正整数输入。
///不要指望这个函数可以用于大数字,而且
///可能是最慢的递归实现。
///
private static int fib(int n)
{
if(n == 0 || n == 1)
{ return n;
} return fib(n - 1)+ fib(n - 2);
}
}
服务器代码非常简单:
- 像往常一样,我们首先建立连接,通道和声明队列。
- 我们可能想要运行多个服务器进程。为了在多个服务器上平均分配负载,我们需要在channel.basicQos中设置 prefetchCount设置。
- 我们使用basicConsume来访问队列。然后我们注册一个交付处理程序,在这个处理程序中我们完成这项工作,然后将回复发送回去
我们的RPC客户端RPCClient.cs的代码: