使用异步 Rpc 客户端的 Flask 服务器仅响应两个请求中的 1 个
Posted
技术标签:
【中文标题】使用异步 Rpc 客户端的 Flask 服务器仅响应两个请求中的 1 个【英文标题】:Flask server using asynchronous Rpc client only answer 1 request out of two 【发布时间】:2021-10-11 08:05:44 【问题描述】:我正在尝试在 Flask 服务器中实现异步 RPC 客户端。
这个想法是,每个请求都会产生一个带有 uuid 的线程,并且每个请求都将等待,直到 RpcClient
queue
属性对象中有正确的 uuid
响应。
问题是两个请求中的一个失败。我认为这可能是多线程的问题,但我不明白它来自哪里。 Bug can be seen here.
使用调试打印,似乎在_on_response
回调中收到了具有正确uuid 的消息,并正确更新了此实例中的queue
属性,但/rpc_call/<payload>
端点内的queue
属性没有t 同步(因此queue[uuid]
在RpcClient
回调中的值为response
,但在端点范围内仍为None
)。
我的代码:
from flask import Flask, jsonif
from gevent.pywsgi import WSGIServer
import sys
import os
import pika
import uuid
import time
import threading
class RpcClient(object):
"""Asynchronous Rpc client."""
internal_lock = threading.Lock()
queue =
def __init__(self):
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(host='rabbitmq'))
self.channel = self.connection.channel()
self.channel.basic_qos(prefetch_count=1)
self.channel.exchange_declare(exchange='kaldi_expe', exchange_type='topic')
# Create all the queue and bind them to the corresponding routing key
self.channel.queue_declare('request', durable=True)
result = self.channel.queue_declare('answer', durable=True)
self.channel.queue_bind(exchange='kaldi_expe', queue='request', routing_key='kaldi_expe.web.request')
self.channel.queue_bind(exchange='kaldi_expe', queue='answer', routing_key='kaldi_expe.kaldi.answer')
self.callback_queue = result.method.queue
.
thread = threading.Thread(target=self._process_data_events)
thread.setDaemon(True)
thread.start()
def _process_data_events(self):
self.channel.basic_consume(self.callback_queue, self._on_response, auto_ack=True)
while True:
with self.internal_lock:
self.connection.process_data_events()
time.sleep(0.1)
def _on_response(self, ch, method, props, body):
"""On response we simply store the result in a local dictionary."""
self.queue[props.correlation_id] = body
def send_request(self, payload):
corr_id = str(uuid.uuid4())
self.queue[corr_id] = None
with self.internal_lock:
self.channel.basic_publish(exchange='kaldi_expe',
routing_key="kaldi_expe.web.request",
properties=pika.BasicProperties(
reply_to=self.callback_queue,
correlation_id=corr_id,
),
body=payload)
return corr_id
def flask_app():
app = Flask("kaldi")
@app.route('/', methods=['GET'])
def server_is_up():
return 'server is up', 200
@app.route('/rpc_call/<payload>')
def rpc_call(payload):
"""Simple Flask implementation for making asynchronous Rpc calls. """
corr_id = app.config['RPCclient'].send_request(payload)
while app.config['RPCclient'].queue[corr_id] is None:
#print("queue server: " + str(app.config['RPCclient'].queue))
time.sleep(0.1)
return app.config['RPCclient'].queue[corr_id]
if __name__ == '__main__':
while True:
try:
rpcClient = RpcClient()
app = flask_app()
app.config['RPCclient'] = rpcClient
print("Rabbit MQ is connected, starting server", file=sys.stderr)
app.run(debug=True, threaded=True, host='0.0.0.0')
except pika.exceptions.AMQPConnectionError as e:
print("Waiting for RabbitMq startup" + str(e), file=sys.stderr)
time.sleep(1)
except Exception as e:
worker.log.error(e)
exit(e)
【问题讨论】:
【参考方案1】:我找到了错误的来源:
app.run(debug=True, threaded=True, host='0.0.0.0')
这一行的debug=True
一开始就重启服务器。
然后从头开始重新启动整个脚本。因此,另一个 rpcClient 被初始化并从同一个队列中消费。问题是前一个线程也在运行。这会导致两个 rpcClient 从同一个线程进行消费,其中一个实际上是无用的。
【讨论】:
以上是关于使用异步 Rpc 客户端的 Flask 服务器仅响应两个请求中的 1 个的主要内容,如果未能解决你的问题,请参考以下文章