与 Flask 或其他 Python 网络框架并行生成和处理 Websocket 输出数据的后台线程

Posted

技术标签:

【中文标题】与 Flask 或其他 Python 网络框架并行生成和处理 Websocket 输出数据的后台线程【英文标题】:Background threads that generate and process data for Websocket output in parallel to Flask or other Python webframework 【发布时间】:2014-03-07 03:38:15 【问题描述】:

好的,我知道那是一口。我不知道如何处理这个问题。我想运行与 Flask 并行处理数据的线程,但我没有看到很多人这样做。这是一个个人网络应用程序,所以我不想走 Celery 和 RabbitMQ 的道路。我已经创建了一个连接到股票经纪人 API 并流式传输股票数据的模块。为简单起见,假设它以每秒 1 个数字的速率生成一个随机数。在数字生成器线程创建的每个新数据滴答声中,我都希望其他几个线程处理相同的数字。让我们称它们为数学线程。一旦他们完成了最新数字的处理,我希望将结果组合(成 JSON)并通过 Websocket 发送。我已经能够独立地使用 Flask-Sockets 通过 Websocket 发送数据。这是我想要完成的示例,其中每个框都可以被视为一个线程。

                     -------------
                     |  Number   |
                     | Generator | (rate of 1 number / second)
                     -------------
                           |  (same number sent to all 3 math threads)
                      -----+----- 
                     /     |     \
                    v      v      v
            ---------  ---------  ---------
            | math1 |  | math2 |  | math3 |
            ---------  ---------  ---------
                |          |          |  (results combined and sent over Websocket)
                v          v          v
    ---------------------------------------
    | Flask |       WebSocket Handler     |
    ---------------------------------------

这是简单的 websocket 代码。

app = Flask(__name__)
sockets = Sockets(app)

@sockets.route('/stream')
def stream_socket(ws):
    while True: # for now just sending endless stream of time + random number every second
        message = "time": int(time.time()*1000), "data": randrange(100)
        ws.send(json.dumps(message))
        time.sleep(1)

@app.route('/')
def test():
    return render_template('main.html')

if __name__ == '__main__':
    app.run(host='0.0.0.0',debug=True,threaded=True)

所以我有“数字生成器”模块(实际上是股票报价流媒体)和 websocket 连接独立工作。我只需要将它们与线程连接在一起,这就是我苦苦挣扎的地方。如果该数字生成器是一个简单的随机数生成器,并且数学线程也很简单(例如 2*x、sinc(x) 等),那么我很好奇是否有人能让我朝着正确的方向前进线程与 Flask 并行完成。也许是一些骨架代码。谢谢。

更新:我可以让单独的线程与 Flask 并行运行,如下所示。这在我运行“python test.py”时有效,但是为了让 Websockets 工作,我使用 gunicorn,例如“gunicorn -k flask_sockets.worker test:app”。然而,这似乎阻止了多线程工作。

UPDATE2:我能够使用 gevent 而不是 gunicorn 让 Websockets 和多线程工作。下面更新了代码。

from gevent import pywsgi
from geventwebsocket.handler import WebSocketHandler

class myThread(threading.Thread):

    def __init__(self, wait, msg):
        super(myThread, self).__init__()
        self.wait = wait
        self.msg = msg

    def run(self):
        for i in range(5):
            time.sleep(self.wait)
            print self.msg

app = Flask(__name__)
sockets = Sockets(app)

@sockets.route('/stream')
def stream_socket(ws):
    while True: # for now just sending endless stream of time + random number every second
        message = "time": int(time.time()*1000), "data": randrange(100)
        ws.send(json.dumps(message))
        time.sleep(1)

@app.route('/')
def test():
    return render_template('main.html')

if __name__ == '__main__':
    thread1 = myThread(2, "thread1")
    thread2 = myThread(3, "thread2")
    thread1.start()
    thread2.start()
    server = pywsgi.WSGIServer(('', 5000), app, handler_class=WebSocketHandler)
    server.serve_forever()
    #app.run(host='0.0.0.0',debug=True,threaded=True)

现在我已经能够在 Flask 通过 Websocket 独立发送数据的同时实现并行线程,我想我现在最大的问题是如何与 Websocket 装饰器共享并行工作线程的结果因此它可以发送由这些单独线程处理的数据。有没有办法将 @sockets.route 装饰器放在一个线程中?

【问题讨论】:

【参考方案1】:

好的,这样的事情似乎有效。单独的线程以每秒一个的速度生成数字并将它们放入队列中。 Websocket 处理程序在可用时从队列中获取数据,并将其发送到 Websocket。显然这只是一个简单的例子,但它让我朝着正确的方向前进。如果有人有任何建议,仍然会很好奇。

app = Flask(__name__)
sockets = Sockets(app)

myQueue = Queue.Queue(10)

class myThread(threading.Thread):

    def __init__(self, length):
        super(myThread, self).__init__()
        self.length = length

    def run(self):
        for i in range(self.length):
            time.sleep(1)
            myQueue.put("time": int(time.time()*1000), "data": randrange(100))


@sockets.route('/stream')
def stream_socket(ws):
    while True:
        message = myQueue.get()
        ws.send(json.dumps(message))


@app.route('/')
def test():
    return render_template('main.djhtml')

if __name__ == '__main__':

    thread1 = myThread(30)
    thread1.start()
    server = pywsgi.WSGIServer(('', 5000), app, handler_class=WebSocketHandler)
    server.serve_forever()

【讨论】:

以上是关于与 Flask 或其他 Python 网络框架并行生成和处理 Websocket 输出数据的后台线程的主要内容,如果未能解决你的问题,请参考以下文章

Python最火框架入门了吗?微型框架 Flask与Web框架Django示例!

第四篇 与Flask相关的插件(flask-sessionwtformsSQLAchemy)

Flask快速入门

Flask快速入门

一线Python运维澳洲5盘口出租搭建开发带你秒懂Flask框架

Web开发Python实现Web服务器(Flask测试后台框架模板)