从rabbitMQ消费时,Flask消费者不执行回调

Posted

技术标签:

【中文标题】从rabbitMQ消费时,Flask消费者不执行回调【英文标题】:Flask consummer doesn't execute callback when consomming from rabbitMQ 【发布时间】:2021-11-26 01:34:47 【问题描述】:

所以我有这个问题。我想同时使用FlaskRabbitMQ 来做一个能够完成一些计算繁重任务的微服务。我基本上想要文档中的 Remote procedure call (RPC) 教程之类的东西,但需要 REST Api 开销。

到目前为止,我已经提供了该代码:

server.py

from flask import Flask

import sys
import os
import json

import pika
import uuid
import time

''' HEADERS = 'Content-type': 'audio/*', 'Accept': 'text/plain''''

class RPIclient(object):
        def __init__(self):
            self.connection = pika.BlockingConnection(
                pika.ConnectionParameters(host='rabbitmq'))
            self.channel = self.connection.channel()
            self.channel.basic_qos(prefetch_count=1)
            self.channel.exchange_declare(exchange='kaldi_expe', exchange_type='topic')

            # Create all the queue and bind them to the corresponding routing key
            self.channel.queue_declare('request', durable=True)
            result = self.channel.queue_declare('answer', durable=True)

            self.channel.queue_bind(exchange='kaldi_expe', queue='request', routing_key='kaldi_expe.web.request')
            self.channel.queue_bind(exchange='kaldi_expe', queue='answer', routing_key='kaldi_expe.kaldi.answer')
            self.callback_queue = result.method.queue

            self.channel.basic_consume(queue="answer", on_message_callback=self.on_response)

        def on_response(self, ch, method, props, body):
            print("from server, correlation id : " + str(props.correlation_id), file=sys.stderr)
            self.response = body
            ch.basic_ack(delivery_tag=method.delivery_tag)

        def call(self, n):
            print("Launched Call ")
            self.response = None
            self.corr_id = str(uuid.uuid4())
            self.channel.basic_publish(
                exchange='kaldi_expe',
                routing_key='kaldi_expe.web.request',
                properties=pika.BasicProperties(
                    correlation_id=self.corr_id,
                ),
                body=str(n))
            while self.response is None:
                self.connection.process_data_events()
            return int(2)

def flask_app():
    app = Flask("__name__")

    @app.route('/', methods=['GET'])
    def server_is_up():
        return 'server is up', 200

    @app.route('/add-job/<cmd>')
    def add(cmd):
        app.config['RPIclient'].call(10)
        return "Call RPI client",404

    return app

if __name__ == '__main__':
    print("Waiting for RabbitMq")
    time.sleep(20)
    rpiClient = RPIclient()
    app = flask_app()
    app.config['RPIclient'] = rpiClient
    print("Rabbit MQ is connected, starting server", file=sys.stderr)
    app.run(debug=True, threaded=False, host='0.0.0.0')

worker.py

import pika
import time
import sys

print(' [*] Waiting for RabbitMQ ...')
time.sleep(20)

print(' [*] Connecting to server ...')
channel = connection.channel()

print(' [*] Waiting for messages.')
def callback(ch, method, properties, body):
    print(" [x] Received %s" % body)
    print(" [x] Executing task ")
    print("from worker, correlation id : " + str(properties.correlation_id))
    ch.basic_publish(
                exchange='kaldi_expe',
                routing_key='kaldi_expe.kaldi.answer',
                properties=pika.BasicProperties(correlation_id = properties.correlation_id),
                body="response")
    print(" [x] Done")

    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='request', on_message_callback=callback)

channel.start_consuming()

可悲的是,当我发回一条消息(从工作人员到服务器)时,服务器似乎确实使用了消息,但从不执行回调(它显示消息已使用,但在rabbit mq 接口。另外,打印不显示)。

我很迷茫,因为消息似乎已被消费,但回调似乎没有被执行。你知道它可能来自哪里吗?

【问题讨论】:

worker.py 从哪里运行?它放在这个什么地方?谢谢 【参考方案1】:

您确实将回调方法 on_response 附加到队列 @9​​87654322@,但您从未告诉您的服务器开始使用队列。

看起来您在类初始化结束时缺少self.channel.start_consuming()

【讨论】:

以上是关于从rabbitMQ消费时,Flask消费者不执行回调的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMQ 没有选择正确的消费者

rabbitmq(二)订阅模式\路由模式\topic

如何使用 gen-server 将来自 rabbitmq 消费者的响应发送到 Erlang 中的生产者

RabbitMQ / AMQP 中的消息组

源码解析: Spring RabbitMQ消费者

Rabbitmq消费失败死信队列