3.6.7 RabbitMQ教程六 – RPC

Posted infinitecodes

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了3.6.7 RabbitMQ教程六 – RPC相关的知识,希望对你有一定的参考价值。

Remote procedure call (RPC)

What This Tutorial Focuses On

In the second tutorial we learned how to use Work Queues to distribute time-consuming tasks among multiple workers.

在第二篇教程中我们学到了如何使用Work Queues在多个工作端之间分配耗时任务。

But what if we need to run a function on a remote computer and wait for the result? Well, that‘s a different story. This pattern is commonly known as Remote Procedure Call or RPC.

但如果我们需要在一个远程计算机上运行一个函数并等待结果呢?嗯,这事儿就不一样了。这个模式通常被称为远程过程调用或RPC

In this tutorial we‘re going to use RabbitMQ to build an RPC system: a client and a scalable RPC server. As we don‘t have any time-consuming tasks that are worth distributing, we‘re going to create a dummy RPC service that returns Fibonacci numbers.

这篇教程,我们会使用RabbitMQ来构建一个RPC系统:一个客户端和一个可伸缩的RPC服务端。由于我们没有任何值得分配的耗时任务,所以我们会创建一个无脑的RPC服务,它返回Fibonacci数列。

Client interface

To illustrate how an RPC service could be used we‘re going to create a simple client class. It‘s going to expose a method named call which sends an RPC request and blocks until the answer is received:

为了演示一个RPC服务如何使用,我们会创建一个简单的客户端的类。它将公开一个名为call的方法,该方法发送一个RPC请求并阻塞直到收到应答

fibonacci_rpc = FibonacciRpcClient()
result = fibonacci_rpc.call(4)
print("fib(4) is %r" % result)

A note on RPC

Although RPC is a pretty common pattern in computing, it‘s often criticised. The problems arise when a programmer is not aware whether a function call is local or if it‘s a slow RPC. Confusions like that result in an unpredictable system and adds unnecessary complexity to debugging. Instead of simplifying software, misused RPC can result in unmaintainable spaghetti code.

尽管RPC是计算中非常常见的模式,但它经常为人所诟病。当程序员不知道一个函数调用是本地的还是一个慢的RPC服务时,问题就出现了。像这样的混乱导致了不可预测的系统,并增加了不必要的调试的复杂性。错误使用RPC不会简化软件,反而会导致无法维护的意大利面一般的(大概意思就是一团混乱,缠在一起了)代码

Bearing that in mind, consider the following advice:

  • Make sure it‘s obvious which function call is local and which is remote.
  • Document your system. Make the dependencies between components clear.
  • Handle error cases. How should the client react when the RPC server is down for a long time?

考虑到这一点,请考虑以下建议

  • 确保让哪个函数调用是本地的,哪个是远程的这一点看上去很明显
  • 记录您的系统。明确组件之间的依赖关系
  • 处理错误案例。当RPC服务器长时间关闭时,客户端应该如何反应?

When in doubt avoid RPC. If you can, you should use an asynchronous pipeline - instead of RPC-like blocking, results are asynchronously pushed to a next computation stage.

有疑问时避免使用RPC。如果可以的话,你应该使用一个异步管道 - 结果将异步推送到下一个计算阶段,而不是像RPC那样的阻塞。

Callback queue

In general doing RPC over RabbitMQ is easy. A client sends a request message and a server replies with a response message. In order to receive a response the client needs to send a ‘callback‘ queue address with the request. Let‘s try it:

一般来说,在RabbitMQ上执行RPC很容易。客户端发送请求消息,服务器用响应消息答复。为了接收响应,客户端需要随请求发送一个“callback”队列地址。我们来试试:

result = channel.queue_declare(queue=‘‘, exclusive=True)
callback_queue = result.method.queue

channel.basic_publish(exchange=‘‘,
                      routing_key=‘rpc_queue‘,
                      properties=pika.BasicProperties(
                            reply_to = callback_queue,
                            ),
                      body=request)

Message properties

The AMQP 0-9-1 protocol predefines a set of 14 properties that go with a message. Most of the properties are rarely used, with the exception of the following:

  • delivery_mode: Marks a message as persistent (with a value of 2) or transient (any other value). You may remember this property from the second tutorial.
  • content_type: Used to describe the mime-type of the encoding. For example for the often used JSON encoding it is a good practice to set this property to: application/json.
  • reply_to: Commonly used to name a callback queue.
  • correlation_id: Useful to correlate RPC responses with requests.

AMQP 0-9-1协议预定义了一个14个属性的集合,这个集合和消息一起发送。大多数的属性都用不到,以下这些是例外:

  • delivery_mode: 将消息标记为持久(值为2)或暂时(任何其他值)。你可能还记得第二个教程中的这个属性。
  • content_type: 用于描述编码的mime类型。比如经常使用的JSON编码,把这个属性设置为:application/json是一个很好的练习
  • reply_to:通常用来给一个callback队列命名
  • correlation_id: 有助于将RPC响应与请求关联起来
Correlation id

In the method presented above we suggest creating a callback queue for every RPC request. That‘s pretty inefficient, but fortunately there is a better way - let‘s create a single callback queue per client.

在上述方法中,我们建议为每个RPC请求创建callback队列。这相当低效,但幸运的是有更好的方法 - 让我们为每一个客户端创建一条单独的callback队列。

That raises a new issue, having received a response in that queue it‘s not clear to which request the response belongs. That‘s when the correlation_id property is used. We‘re going to set it to a unique value for every request. Later, when we receive a message in the callback queue we‘ll look at this property, and based on that we‘ll be able to match a response with a request. If we see an unknown correlation_id value, we may safely discard the message - it doesn‘t belong to our requests.

这又出现一个新问题,在队列中收到响应后,不清楚响应属于哪个请求。那是因为当correlation_id属性被使用时。我们会为每个请求设置一个唯一的值。稍后,当我们在callback队列中接收到一条消息时我们会查看这个属性,并基于此属性我们能够将一个回复与一个请求匹配。如果我们看到一个未知的correlation_id值,我们就能安全的丢弃该消息 - 它不属于我们的请求。

You may ask, why should we ignore unknown messages in the callback queue, rather than failing with an error? It‘s due to a possibility of a race condition on the server side. Although unlikely, it is possible that the RPC server will die just after sending us the answer, but before sending an acknowledgment message for the request. If that happens, the restarted RPC server will process the request again. That‘s why on the client we must handle the duplicate responses gracefully, and the RPC should ideally be idempotent.

你也许会问,为什么我们要忽略那些callback队列中的未知消息,而不是报错运行失败?这是因为服务器端可能存在竞争情况。虽然不太可能,但RPC服务端可能就在给我们发送应答之后,但在给请求发送一条确认消息之前就会死掉。如果这个发生了,重启的RPC服务端会再次处理该请求。这就是为什么在客户端我们必须优雅地处理重复的响应,而RPC从理想上来说应该是等幂的。

Summary

                                   技术图片

Our RPC will work like this:

  • When the Client starts up, it creates an anonymous exclusive callback queue.
  • For an RPC request, the Client sends a message with two properties: reply_to, which is set to the callback queue and correlation_id, which is set to a unique value for every request.
  • The request is sent to an rpc_queue queue.
  • The RPC worker (aka: server) is waiting for requests on that queue. When a request appears, it does the job and sends a message with the result back to the Client, using the queue from the reply_to field.
  • The client waits for data on the callback queue. When a message appears, it checks the correlation_id property. If it matches the value from the request it returns the response to the application.

我们的RPC会像这样工作:

  • 当客户端启动,它创建一个匿名的、独占的callback队列
  • 对一个RPC请求,客户端会发送一条带有两个属性的消息:reply_to,它被设置给callback队列和correlation_id,它为每个请求设置一个唯一的值
  • RPC工作端(服务端)等待队列里的每个请求。当一条请求出现时,它会干活并发回一条有结果的消息给客户端,通过使用reply_to域内的队列
  • 客户端等待callback队列中的数据。当一条消息出现时,它检查correlation_id属性。如果它与请求中的值匹配,则会将响应返回给应用程序。

Putting it all together

rpc_server.py

import pika

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host=‘localhost‘))

channel = connection.channel()

channel.queue_declare(queue=‘rpc_queue‘)

def fib(n):
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        return fib(n - 1) + fib(n - 2)

def on_request(ch, method, props, body):
    n = int(body)

    print(" [.] fib(%s)" % n)
    response = fib(n)

    ch.basic_publish(exchange=‘‘,
                     routing_key=props.reply_to,
                     properties=pika.BasicProperties(correlation_id =                                                          props.correlation_id),
                     body=str(response))
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue=‘rpc_queue‘, on_message_callback=on_request)

print(" [x] Awaiting RPC requests")
channel.start_consuming()

The server code is rather straightforward:

  • (4) As usual we start by establishing the connection and declaring the queue.
  • (11) We declare our fibonacci function. It assumes only valid positive integer input. (Don‘t expect this one to work for big numbers, it‘s probably the slowest recursive implementation possible).
  • (19) We declare a callback for basic_consume, the core of the RPC server. It‘s executed when the request is received. It does the work and sends the response back.
  • (32) We might want to run more than one server process. In order to spread the load equally over multiple servers we need to set the prefetch_count setting.

服务器代码相当简单:

  • (4)像往常一样,我们首先建立连接并声明队列
  • (11)我们声明Fibonacci函数。它假定只有有效的正整数输入。(不要指望它一个对大数有用,它可能是最慢的递归实现)
  • (19)我们为RPC服务器的核心basic_consume声明一个callback。它在收到请求时执行。它完成工作并将响应发送回。
  • (32)我们可能需要运行多个服务器进程。为了在多个服务器上平均分配负载,我们需要设置prefetch_count设定。

rpc_client.py

import pika
import uuid

class FibonacciRpcClient(object):

    def __init__(self):
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters(host=‘localhost‘))

        self.channel = self.connection.channel()

        result = self.channel.queue_declare(queue=‘‘, exclusive=True)
        self.callback_queue = result.method.queue

        self.channel.basic_consume(
            queue=self.callback_queue,
            on_message_callback=self.on_response,
            auto_ack=True)

    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:
            self.response = body

    def call(self, n):
        self.response = None
        self.corr_id = str(uuid.uuid4())
        self.channel.basic_publish(
            exchange=‘‘,
            routing_key=‘rpc_queue‘,
            properties=pika.BasicProperties(
                reply_to=self.callback_queue,
                correlation_id=self.corr_id,
            ),
            body=str(n))
        while self.response is None:
            self.connection.process_data_events()
        return int(self.response)


fibonacci_rpc = FibonacciRpcClient()

print(" [x] Requesting fib(30)")
response = fibonacci_rpc.call(30)
print(" [.] Got %r" % response)

The client code is slightly more involved:

  • (7) We establish a connection, channel and declare an exclusive ‘callback‘ queue for replies.
  • (16) We subscribe to the ‘callback‘ queue, so that we can receive RPC responses.
  • (18) The ‘on_response‘ callback executed on every response is doing a very simple job, for every response message it checks if the correlation_id is the one we‘re looking for. If so, it saves the response in self.response and breaks the consuming loop.
  • (23) Next, we define our main call method - it does the actual RPC request.
  • (24) In this method, first we generate a unique correlation_id number and save it - the ‘on_response‘ callback function will use this value to catch the appropriate response.
  • (25) Next, we publish the request message, with two properties: reply_to and correlation_id.
  • (32) At this point we can sit back and wait until the proper response arrives.
  • (33) And finally we return the response back to the user.

客户机代码稍微复杂一些:

  • (7)我们建立一个连接、通道并为回复声明一个独占的“callback”队列。
  • (16)我们订阅“callback”队列,以便接收RPC响应。
  • (18)对每个响应执行的“on_response”回调操作非常简单,它会检查每个响应消息的correlation_id是否是我们要查找的correlation_id。如果是,它会将响应保存在self.response中,并断开消费循环。
  • (23)接下来,我们定义主要的调用方法-它执行实际的RPC请求。
  • (24)在这个方法中,首先我们生成一个唯一的correlation_id号并保存它 - “on_response”回调函数将使用这个值来捕获适当的响应。
  • (25)接下来,我们发布请求消息,它有两个属性:reply_to和correlation_id。
  • (32)此时,我们可以坐下来并等待直到有合适的回应。
  • (33)最后,我们将响应返回给用户。

Our RPC service is now ready. We can start the server:

我们的rpc服务端现在就绪。我们可以启动服务端:

python rpc_server.py

To request a fibonacci number run the client:

运行客户端,请求fibonacci数列:

python rpc_client.py

The presented design is not the only possible implementation of a RPC service, but it has some important advantages:

  • If the RPC server is too slow, you can scale up by just running another one. Try running a second rpc_server.py in a new console.
  • On the client side, the RPC requires sending and receiving only one message. No synchronous calls like queue_declare are required. As a result the RPC client needs only one network round trip for a single RPC request.

已提出的设计不是RPC服务的唯一可能实现,但它有一些重要的优点:

  • 如果RPC服务器太慢,可以通过运行另一个服务器来扩展。尝试在新控制台中运行第二个rpc_server.py
  • 在客户端,RPC只需要发送和接收一条消息。不需要queue_declare之类的同步调用。因此,RPC客户机对于单个RPC请求只需要一次网络往返。

以上是关于3.6.7 RabbitMQ教程六 – RPC的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMQ入门教程:路由选择Routing

六.RabbitMQ消息队列的基础+实战

python第六十天-----RabbitMQ

RabbitMQ六种队列模式-简单队列模式

六、rancher搭建rabbitmq集群化部署

六RabbitMQ发布确认