简单的 Python 多线程网络服务器,带有 Asyncio 和在主函数中调用的事件

Posted

技术标签:

【中文标题】简单的 Python 多线程网络服务器,带有 Asyncio 和在主函数中调用的事件【英文标题】:Simple Python multi threaded webserver with Asyncio and events called in main function 【发布时间】:2020-06-24 01:46:42 【问题描述】:

我有一个简单的 Python 程序,我想做三件事:

提供 HTTP 文档 服务 Websockets 与 Websocket 数据交互

我正在尝试使用/grok asyncio。问题是我不知道如何访问从主事件循环中的函数获取的数据。

例如,在下面的代码中,我有两个线程。

一个线程是HTTP服务器线程,一个线程是Websocket服务器线程,还有主线程。

我要做的是在主线程中打印websocket接收线程中捕获的数据。

我知道如何做到这一点的唯一方法是使用队列在线程之间传递数据,此时我什至不知道使用asyncio 的优势是什么。

同样,将事件循环传递给serve_websocket 函数感觉很奇怪。

谁能解释一下如何构建这个以从 Websocket 函数中获取数据到主函数中?

似乎 / 我想要一种完全不使用 threading 库的方法,这似乎是可能的。在异步项目中,我希望以不同于调用它们的功能对 websocket 事件做出反应。

注意:我知道还有其他用于 websockets 和 http 服务 with asyncio 的库,但这是一个示例,可帮助我了解如何使用此范例构建项目。

谢谢

#!/usr/bin/env python

import json
import socketserver
import threading
import http.server
import asyncio
import time

import websockets

SERVER_ADDRESS = '127.0.0.1'
HTTP_PORT = 8087
WEBSOCKET_PORT = 5678


def serve_http():
    http_handler = http.server.SimpleHTTPRequestHandler
    with socketserver.TCPServer(("", HTTP_PORT), http_handler) as httpd:
        print(f'HTTP server listening on port HTTP_PORT')
        httpd.serve_forever()


def serve_websocket(server, event_loop):
    print(f'Websocket server listening on port WEBSOCKET_PORT')
    event_loop.run_until_complete(server)
    event_loop.run_forever()


async def ws_callback(websocket, path):
    while True:
        data = await websocket.recv()
        # How do I access parsed_data in the main function below
        parsed_data = json.loads(data)
        await websocket.send(data)


def main():
    event_loop = asyncio.get_event_loop()
    ws_server = websockets.serve(ws_callback, SERVER_ADDRESS, WEBSOCKET_PORT)

    threading.Thread(target=serve_http, daemon=True).start()
    threading.Thread(target=serve_websocket, args=(ws_server, event_loop), daemon=True).start()

    try:
        while True:
            # Keep alive - this is where I want to access the data from ws_callback
            # i.e.
            # print(data.values)
            time.sleep(.01)

    except KeyboardInterrupt:
        print('Exit called')


if __name__ == '__main__':
    main()

【问题讨论】:

【参考方案1】:

我相信你不应该在没有特殊需要的情况下混用asynciomultithreading。在您的情况下,请仅使用 asyncio 工具。

在这种情况下,您可以在协程之间共享数据,因为它们都使用 cooperative 多任务处理在同一个线程上运行。

您的代码可以改写为:

#!/usr/bin/env python

import json
import socketserver
import threading
import http.server
import asyncio
import time
import websockets

SERVER_ADDRESS = '127.0.0.1'
HTTP_PORT = 8087
WEBSOCKET_PORT = 5678
parsed_data = 


async def handle_http(reader, writer):
    data = await reader.read(100)
    message = data.decode()
    writer.write(data)
    await writer.drain()
    writer.close()


async def ws_callback(websocket, path):
    global parsed_data
    while True:
        data = await websocket.recv()
        # How do I access parsed_data in the main function below
        parsed_data = json.loads(data)
        await websocket.send(data)


async def main():
    ws_server = await websockets.serve(ws_callback, SERVER_ADDRESS, WEBSOCKET_PORT)
    print(f'Websocket server listening on port WEBSOCKET_PORT')
    http_server = await asyncio.start_server(
        handle_http, SERVER_ADDRESS, HTTP_PORT)
    print(f'HTTP server listening on port HTTP_PORT')

    try:
        while True:
            if parsed_data:
                print(parsed_data.values())
            await asyncio.sleep(0.1)
    except KeyboardInterrupt:
        print('Exit called')


if __name__ == '__main__':
    asyncio.run(main())

【讨论】:

这看起来更像是我想要的方向,但我应该澄清一下我也尽量避免使用全局变量。这真的是我想做的事情的要点。在不带全局变量的 main 函数中从其他协同程序中获取数据。 协程之间可以使用任何共享对象。例如。您可以将协程作为类方法并分享self。如果你想要消费者-生产者模式,你可以使用asyncio.Queue 我明白了。还有这个await asyncio.sleep(0.1) 部分。为什么值为 0.1?为什么不是 0.001?为什么完全有必要?理想情况下,检查解析数据之间不会有任何延迟。

以上是关于简单的 Python 多线程网络服务器,带有 Asyncio 和在主函数中调用的事件的主要内容,如果未能解决你的问题,请参考以下文章

Python实现简单多线程任务队列

Python 套接字socketserver网络编程

Python标准库08 多线程与同步 (threading包)

Python: Socket网络编程,多线程处理小Demo

带有 libevent 的多线程 HTTP 服务器

一个简单的多线程Python爬虫