使用异步 Rpc 客户端的 Flask 服务器仅响应两个请求中的 1 个

Posted

技术标签:

【中文标题】使用异步 Rpc 客户端的 Flask 服务器仅响应两个请求中的 1 个【英文标题】:Flask server using asynchronous Rpc client only answer 1 request out of two 【发布时间】:2021-10-11 08:05:44 【问题描述】:

我正在尝试在 Flask 服务器中实现异步 RPC 客户端。 这个想法是,每个请求都会产生一个带有 uuid 的线程,并且每个请求都将等待,直到 RpcClient queue 属性对象中有正确的 uuid 响应。

问题是两个请求中的一个失败。我认为这可能是多线程的问题,但我不明白它来自哪里。 Bug can be seen here.

使用调试打印,似乎在_on_response 回调中收到了具有正确uuid 的消息,并正确更新了此实例中的queue 属性,但/rpc_call/<payload> 端点内的queue 属性没有t 同步(因此queue[uuid]RpcClient 回调中的值为response,但在端点范围内仍为None)。

我的代码:

from flask import Flask, jsonif
from gevent.pywsgi import WSGIServer
import sys
import os
import pika
import uuid
import time
import threading

class RpcClient(object):
    """Asynchronous Rpc client."""
    internal_lock = threading.Lock()
    queue = 

    def __init__(self):

        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters(host='rabbitmq'))
        self.channel = self.connection.channel()
        self.channel.basic_qos(prefetch_count=1)
        self.channel.exchange_declare(exchange='kaldi_expe', exchange_type='topic')

        # Create all the queue and bind them to the corresponding routing key
        self.channel.queue_declare('request', durable=True)
        result = self.channel.queue_declare('answer', durable=True)

        self.channel.queue_bind(exchange='kaldi_expe', queue='request', routing_key='kaldi_expe.web.request')
        self.channel.queue_bind(exchange='kaldi_expe', queue='answer', routing_key='kaldi_expe.kaldi.answer')
        self.callback_queue = result.method.queue
.
        thread = threading.Thread(target=self._process_data_events)
        thread.setDaemon(True)
        thread.start()

    def _process_data_events(self):
self.channel.basic_consume(self.callback_queue, self._on_response, auto_ack=True)
        while True:
            with self.internal_lock:
                self.connection.process_data_events()
            time.sleep(0.1)

    def _on_response(self, ch, method, props, body):
        """On response we simply store the result in a local dictionary."""
        self.queue[props.correlation_id] = body


    def send_request(self, payload):
        corr_id = str(uuid.uuid4())
        self.queue[corr_id] = None
        with self.internal_lock:
            self.channel.basic_publish(exchange='kaldi_expe',
                                       routing_key="kaldi_expe.web.request",
                                       properties=pika.BasicProperties(
                                           reply_to=self.callback_queue,
                                           correlation_id=corr_id,
                                       ),
                                       body=payload)
        return corr_id


def flask_app():
    app = Flask("kaldi")

    @app.route('/', methods=['GET'])
    def server_is_up():
        return 'server is up', 200

    @app.route('/rpc_call/<payload>')
    def rpc_call(payload):
        """Simple Flask implementation for making asynchronous Rpc calls. """
        corr_id = app.config['RPCclient'].send_request(payload)

        while app.config['RPCclient'].queue[corr_id] is None:
            #print("queue server: " + str(app.config['RPCclient'].queue))
            time.sleep(0.1)

        return app.config['RPCclient'].queue[corr_id]

if __name__ == '__main__':
    while True:
        try:
            rpcClient = RpcClient()
            app = flask_app()
            app.config['RPCclient'] = rpcClient
            print("Rabbit MQ is connected, starting server", file=sys.stderr)
            app.run(debug=True, threaded=True, host='0.0.0.0')
        except pika.exceptions.AMQPConnectionError as e:
            print("Waiting for RabbitMq startup" + str(e), file=sys.stderr)
            time.sleep(1)
        except Exception as e:
            worker.log.error(e)
            exit(e)

【问题讨论】:

【参考方案1】:

我找到了错误的来源:

app.run(debug=True, threaded=True, host='0.0.0.0')这一行的debug=True一开始就重启服务器。

然后从头开始重新启动整个脚本。因此,另一个 rpcClient 被初始化并从同一个队列中消费。问题是前一个线程也在运行。这会导致两个 rpcClient 从同一个线程进行消费,其中一个实际上是无用的。

【讨论】:

以上是关于使用异步 Rpc 客户端的 Flask 服务器仅响应两个请求中的 1 个的主要内容,如果未能解决你的问题,请参考以下文章

微服务开源框架TARS的RPC源码解析 之 初识TARS C++服务端

基于zookeeper实现rpc注册中心

java中的高性能分布式异步RPC

在 Flask 中异步传输大文件

python 服务器和客户端的flask请求ID

nfs