从rabbitMQ消费时,Flask消费者不执行回调
Posted
技术标签:
【中文标题】从rabbitMQ消费时,Flask消费者不执行回调【英文标题】:Flask consummer doesn't execute callback when consomming from rabbitMQ 【发布时间】:2021-11-26 01:34:47 【问题描述】:所以我有这个问题。我想同时使用Flask
和RabbitMQ
来做一个能够完成一些计算繁重任务的微服务。我基本上想要文档中的
Remote procedure call (RPC) 教程之类的东西,但需要 REST Api 开销。
到目前为止,我已经提供了该代码:
server.py
from flask import Flask
import sys
import os
import json
import pika
import uuid
import time
''' HEADERS = 'Content-type': 'audio/*', 'Accept': 'text/plain''''
class RPIclient(object):
def __init__(self):
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(host='rabbitmq'))
self.channel = self.connection.channel()
self.channel.basic_qos(prefetch_count=1)
self.channel.exchange_declare(exchange='kaldi_expe', exchange_type='topic')
# Create all the queue and bind them to the corresponding routing key
self.channel.queue_declare('request', durable=True)
result = self.channel.queue_declare('answer', durable=True)
self.channel.queue_bind(exchange='kaldi_expe', queue='request', routing_key='kaldi_expe.web.request')
self.channel.queue_bind(exchange='kaldi_expe', queue='answer', routing_key='kaldi_expe.kaldi.answer')
self.callback_queue = result.method.queue
self.channel.basic_consume(queue="answer", on_message_callback=self.on_response)
def on_response(self, ch, method, props, body):
print("from server, correlation id : " + str(props.correlation_id), file=sys.stderr)
self.response = body
ch.basic_ack(delivery_tag=method.delivery_tag)
def call(self, n):
print("Launched Call ")
self.response = None
self.corr_id = str(uuid.uuid4())
self.channel.basic_publish(
exchange='kaldi_expe',
routing_key='kaldi_expe.web.request',
properties=pika.BasicProperties(
correlation_id=self.corr_id,
),
body=str(n))
while self.response is None:
self.connection.process_data_events()
return int(2)
def flask_app():
app = Flask("__name__")
@app.route('/', methods=['GET'])
def server_is_up():
return 'server is up', 200
@app.route('/add-job/<cmd>')
def add(cmd):
app.config['RPIclient'].call(10)
return "Call RPI client",404
return app
if __name__ == '__main__':
print("Waiting for RabbitMq")
time.sleep(20)
rpiClient = RPIclient()
app = flask_app()
app.config['RPIclient'] = rpiClient
print("Rabbit MQ is connected, starting server", file=sys.stderr)
app.run(debug=True, threaded=False, host='0.0.0.0')
worker.py
import pika
import time
import sys
print(' [*] Waiting for RabbitMQ ...')
time.sleep(20)
print(' [*] Connecting to server ...')
channel = connection.channel()
print(' [*] Waiting for messages.')
def callback(ch, method, properties, body):
print(" [x] Received %s" % body)
print(" [x] Executing task ")
print("from worker, correlation id : " + str(properties.correlation_id))
ch.basic_publish(
exchange='kaldi_expe',
routing_key='kaldi_expe.kaldi.answer',
properties=pika.BasicProperties(correlation_id = properties.correlation_id),
body="response")
print(" [x] Done")
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='request', on_message_callback=callback)
channel.start_consuming()
可悲的是,当我发回一条消息(从工作人员到服务器)时,服务器似乎确实使用了消息,但从不执行回调(它显示消息已使用,但在rabbit mq 接口。另外,打印不显示)。
我很迷茫,因为消息似乎已被消费,但回调似乎没有被执行。你知道它可能来自哪里吗?
【问题讨论】:
worker.py 从哪里运行?它放在这个什么地方?谢谢 【参考方案1】:您确实将回调方法 on_response
附加到队列 @987654322@,但您从未告诉您的服务器开始使用队列。
看起来您在类初始化结束时缺少self.channel.start_consuming()
。
【讨论】:
以上是关于从rabbitMQ消费时,Flask消费者不执行回调的主要内容,如果未能解决你的问题,请参考以下文章