Flask + RabbitMQ + SocketIO - 转发消息

Posted

技术标签:

【中文标题】Flask + RabbitMQ + SocketIO - 转发消息【英文标题】:Flask + RabbitMQ + SocketIO - forwarding messages 【发布时间】:2015-06-10 05:54:11 【问题描述】:

我在通过 SocketIO 从 RabbitMQ 向用户发送消息时遇到问题。

我有 SocketIO integration 的 Flask 应用程序。 当前用户流好像

问题是我无法设置通过 SocketIO 将消息转发到浏览器的 RabbitMQ 侦听器。每次我得到不同的错误。主要是连接已关闭,或者我在应用程序上下文之外工作。

我尝试了很多方法,这是我的最后一个。

# callback 
def mq_listen(uid):
    rabbit = RabbitMQ()
    def cb(ch, method, properties, body, mq=rabbit):
        to_return = [0]  # mutable
        message = Message.load(body)
        to_return[0] = message.get_message()

        emit('report_part', "data": to_return[0])

    rabbit.listen('results', callback=cb, id=uid)

# this is the page, which user reach
@blueprint.route('/report_result/<uid>', methods=['GET'])
def report_result(uid):

    thread = threading.Thread(target=mq_listen, args=(uid,))
    thread.start()

     return render_template("property/report_result.html", socket_id=uid)

rabbit.listen 方法的抽象如下:

def listen(self, queue_name, callback=None, id=None):
    if callback is not None:
        callback_function = callback
    else:
        callback_function = self.__callback
    if id is None:
        self.channel.queue_declare(queue=queue_name, durable=True)
        self.channel.basic_qos(prefetch_count=1)
        self.consumer_tag = self.channel.basic_consume(callback_function, queue=queue_name)
        self.channel.start_consuming()
    else:
        self.channel.exchange_declare(exchange=queue_name, type='direct')
        result = self.channel.queue_declare(exclusive=True)
        exchange_name = result.method.queue
        self.channel.queue_bind(exchange=queue_name, queue=exchange_name, routing_key=id)
        self.channel.basic_consume(callback_function, queue=exchange_name, no_ack=True)
        self.channel.start_consuming()

导致

RuntimeError: working outside of request context

如果有任何提示或使用示例,我将很高兴。

非常感谢

【问题讨论】:

我没有直接回答你所包含的内容(虽然图表很好) - 但我之前遇到过“请求上下文之外”的事情,这通常是一个问题(1) 尝试在烧瓶 API 之外调用烧瓶使用函数,这意味着烧瓶部分必须被排除在函数之外,或者 (2) 需要使用从烧瓶中提取的数据设置回调,以便烧瓶当回调稍后触发或(3)其他一些调用链丢弃烧瓶上下文并且未能传递所需的数据时将需要。不确定这是否会有所帮助。 :-/ 我在您的示例中看不到它,但是您是否创建了应用程序上下文? flask.pocoo.org/docs/0.12/appcontext/…?当您在请求之外运行时,您需要推送上下文。 【参考方案1】:

我遇到了类似的问题,归根结底是因为当您发出请求时,flask 会将请求上下文传递给客户端。但解决方案不是添加 app.app_context()。那是 hackey 并且肯定会出现错误,因为您不是本机发送请求上下文。

我的解决方案是创建一个重定向,以便像这样维护请求上下文:

def sendToRedisFeed(eventPerson, type):
    eventPerson['type'] = type
    requests.get('http://localhost:5012/zmq-redirect', json=eventPerson)

这是我的重定向函数,所以每当有一个事件我想推送到我的 PubSub 时,它都会通过这个函数,然后推送到那个 localhost 端点。

from flask_sse import sse

app.register_blueprint(sse, url_prefix='/stream')

@app.route('/zmq-redirect', methods=['GET'])
def send_message():
    try:
        sse.publish(request.get_json(), type='greeting')
        return Response('Sent!', mimetype="text/event-stream")

    except Exception as e:
        print (e)
        pass

现在,每当一个事件被推送到我的 /zmq-redirect 端点时,它都会被重定向并通过 SSE 发布。

现在最后,只是为了总结一切,客户端:

var source = new EventSource("/stream");

source.addEventListener(
  "greeting",
  function(event) 
    console.log(event)
  
)

【讨论】:

【参考方案2】:

错误消息表明这是一个 Flask 问题。在处理请求时,Flask 会设置一个上下文,但是因为您使用的是线程,所以这个上下文会丢失。到需要时,它不再可用,因此 Flask 给出“在请求上下文之外工作”错误。

解决此问题的常用方法是手动提供上下文。文档中有一个关于此的部分:http://flask.pocoo.org/docs/1.0/appcontext/#manually-push-a-context

您的代码没有显示 socketio 部分。但我想知道使用类似 flask-socketio 的东西是否可以简化一些东西......(https://flask-socketio.readthedocs.io/en/latest/)。我会在后台打开 RabbitMQ 连接(最好是一次)并使用emit 函数将任何更新发送到连接的 SocketIO 客户端。

【讨论】:

以上是关于Flask + RabbitMQ + SocketIO - 转发消息的主要内容,如果未能解决你的问题,请参考以下文章

关于socket和flask的关系

Rabbitmq之socket descriptors

springboot集成RabbitMQ,Eclipse开发集成RabbitMq,IDEA集成RabbitMQ报错 socket close

flask celery 安装说明

rabbitmq

flask_socket.io _ 运行异步任务的问题