如何在多个线程中接收 websocket

Posted

技术标签:

【中文标题】如何在多个线程中接收 websocket【英文标题】:How to receive websockets in multiple threads 【发布时间】:2021-05-31 12:13:34 【问题描述】:

我正在使用库websockets

当尝试从多线程接收 websocket 的函数中接收数据时,出现错误。我无法克服它。我需要你的帮助!

我的代码:

async def receive_ws():
    async with websockets.connect(uri, extra_headers=VALID_HEADERS) as websocket:
        while True:
            result = await websocket.recv()
            print(result)


async def streams():
    list_streams = []
    for i in range(0, 100):
        list_streams.append(str(i))
    for j in list_streams:
        await asyncio.to_thread(receive_ws)


asyncio.run(streams())

我的错误:

C:\Program Files\Python39\lib\asyncio\base_events.py:1891: RuntimeWarning: coroutine 'receive_ws' was never awaited
  handle = None  # Needed to break cycles when an exception occurs.
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
C:\Program Files\Python39\lib\concurrent\futures\thread.py:79: RuntimeWarning: coroutine 'receive_ws' was never awaited
  del work_item
RuntimeWarning: Enable tracemalloc to get the object allocation traceback

【问题讨论】:

而不是await asyncio.to_thread(receive_ws) 你不能只做await receive_ws() 我的目标 - 同时提供来自多个线程的 websockets 你真的需要多线程吗? 是的,我需要同时在多个线程中运行 那为什么要使用异步,通常你可以使用异步并避免使用线程,如果你只是从你的代码中看出来的网络绑定,那么你为什么需要线程? 【参考方案1】:

您可以使用 asyncio 来收集要运行的协同程序列表。下面是一个使用异步睡眠在请求返回之前夸大网络延迟或昂贵计算的示例。像线程一样,请求一个接一个地发送,不需要等到一个完成才能开始下一个。协程将根据需要和响应返回时编入和编出进程。

import asyncio
import websockets
from random import randint
from datetime import datetime


async def receive_ws():
    async with websockets.connect("wss://echo.websocket.org/") as websocket:
        num = randint(1, 10)
        print(f"datetime.now() - sending random num: num")
        await websocket.send(f"num")
        await asyncio.sleep(num)
        result = await websocket.recv()
        print(f"datetime.now() - received random num: result")


async def streams():
    await asyncio.tasks.gather(*[receive_ws() for _ in range(10)])


asyncio.run(streams())

输出

2021-05-31 22:05:29.176031 - sending random num: 3
2021-05-31 22:05:29.261444 - sending random num: 6
2021-05-31 22:05:29.263445 - sending random num: 1
2021-05-31 22:05:29.279507 - sending random num: 5
2021-05-31 22:05:29.281536 - sending random num: 3
2021-05-31 22:05:29.282536 - sending random num: 5
2021-05-31 22:05:29.291587 - sending random num: 8
2021-05-31 22:05:29.306629 - sending random num: 5
2021-05-31 22:05:29.307628 - sending random num: 5
2021-05-31 22:05:29.310629 - sending random num: 1
2021-05-31 22:05:30.266371 - received random num: 1
2021-05-31 22:05:30.298470 - received random num: 1
2021-05-31 22:05:32.192146 - received random num: 3
2021-05-31 22:05:32.300554 - received random num: 3
2021-05-31 22:05:34.278179 - received random num: 5
2021-05-31 22:05:34.278179 - received random num: 5
2021-05-31 22:05:34.326365 - received random num: 5
2021-05-31 22:05:34.326365 - received random num: 5
2021-05-31 22:05:35.265017 - received random num: 6
2021-05-31 22:05:37.289913 - received random num: 8

【讨论】:

感谢您所做的工作。尽管如此,我需要多个流来同时接收 websocket。 那么听起来你需要多处理而不是多线程。因为线程一次只能做一件事。

以上是关于如何在多个线程中接收 websocket的主要内容,如果未能解决你的问题,请参考以下文章

浅记线程池模型中多个线程对同个fd接收缓冲区读取争夺的方案

浅记线程池模型中多个线程对同个fd接收缓冲区读取争夺的方案

qt如何解决线程空转的问题

消费端从activemq中取出一定量消息后,是一个一个进行处理,还是开启多个线程同时处理呢?

C++ 持有多个线程

服务器客户端发送/接收多个客户端