使用 python flask 流式传输时查看进程未停止

Posted

技术标签:

【中文标题】使用 python flask 流式传输时查看进程未停止【英文标题】:View processes not stopped when streaming with python flask 【发布时间】:2014-05-15 18:47:21 【问题描述】:

我使用带有 gunicorn 的 gevent worker 的简单 Flask 应用程序来服务服务器发送的事件。

要流式传输内容,我使用:

response = Response(eventstream(), mimetype="text/event-stream")

从 redis 流式传输事件:

def eventstream():
    for message in pubsub.listen():
        # ...
        yield str(event)

部署于:

gunicorn -k gevent -b 127.0.0.1:50008 flaskapplication

但在使用了一段时间后,我打开了 50 个 redis 连接,即使没有人连接到服务器发送的事件流。

似乎视图没有终止,因为 gunicorn 是非阻塞的,而 pubsub.listen() 是阻塞的。

我该如何解决这个问题?我应该限制 gunicorn 可能产生的进程数量,还是应该在超时后烧毁视图?如果可能,它应该在不活动时停止视图/redis 连接,而不断开仍连接到 SSE 流的用户。

【问题讨论】:

【参考方案1】:

您可以运行 gunicorn-t <seconds> 来为您的工作人员指定一个超时时间,如果他们沉默几秒钟就会杀死他们,通常是 30 秒。我认为这应该适用于您的问题,但不完全确定。

据我所知,您似乎还可以重写您的工作人员以使用 gevent 中的 Timeout

这可能类似于以下内容:

from gevent import Timeout

def eventstream():
    pubsub = redis.pubsub()
    try:
        with Timeout(30) as timeout:
            pubsub.subscribe(channel)
            for message in pubsub.listen():
                # ...
                yield str(event)
    except Timeout, t:
        if t is not timeout:
            raise
        else:
            pubsub.unsubscribe(channel)

This example 有助于了解这应该如何工作。

【讨论】:

谢谢,这已经很有帮助了。但我正在寻找的答案是某种检测断开连接的方法。例如 gunicorn 可能会在断开连接或类似情况时引发异常。【参考方案2】:

使用来自 natdempk 解决方案的 Timeout 对象,最优雅的解决方案是发送心跳,以检测死连接:

while True:
    pubsub = redis.pubsub()
    try:
        with Timeout(30) as timeout:
            for message in pubsub.listen():
                # ...
                yield str(event)
                timeout.cancel()
                timeout.start()
    except Timeout, t:
        if t is not timeout:
            raise
        else:
            yield ":\n\n"  # heartbeat

注意需要再次调用redis.pubsub(),因为异常后redis连接丢失,会报错NoneType object has no attribute readline

【讨论】:

以上是关于使用 python flask 流式传输时查看进程未停止的主要内容,如果未能解决你的问题,请参考以下文章

当使用flask进行流式传输时,只有opencv的VIDEO错误。

在流式传输网络摄像头时定期拍照

使用 Jinja 进行 Flask 模板流式传输

是否可以将 python 子进程的输出实时流式传输到网页?

如何在 Flask Server 上的 Google Pubsub 订阅回调中屈服以进行流式传输

在 AS400 上通过 Java 流式传输运行时进程执行(cobol obj)的结果时出现 MalformedInputException