使用 aiohttp 下载多个文件时处理 websocket 流
Posted
技术标签:
【中文标题】使用 aiohttp 下载多个文件时处理 websocket 流【英文标题】:Process websocket stream while downloading multiple files using aiohttp 【发布时间】:2021-08-03 11:53:11 【问题描述】:我正在按照说明 (here) 在我的本地计算机上镜像 Binance Exchange 上的多个订单簿。
假设为简单起见,我希望镜像 2 个符号的订单簿:ETHBTC 和 DOGEBTC(实际上是 350+)。
首先我必须缓冲 websocket 订单更新流:
wss://stream.binance.com:9443/stream?streams=ETHBTC@depth@100ms wss://stream.binance.com:9443/stream?streams=DOGEBTC@depth@100ms现在我必须下载快照:
https://api.binance.com/api/v3/depth?symbol=ETHBTC&limit=1000 https://api.binance.com/api/v3/depth?symbol=DOGEBTC&limit=1000一旦我有了快照,我就会将缓冲区(正在进行中)应用到它们,从而产生一个状态。
之后,所有订单更新都可以简单地应用于状态。
对于我可以做的更新流:
async with aiohttp.ClientSession() as session:
async with session.ws_connect(URL) as wsock:
async for msg in wsock:
if msg.type != aiohttp.WSMsgType.TEXT:
J = json.loads(msg.data)
symbol = J['data']['s']
process_update(symbol, J)
但是,一旦第一次更新到来,我如何才能开始下载快照,并使用完成处理程序来处理它,从而不中断流?
如果我正在跟踪 300 个符号,则同时发生 300 次下载。
我找到了关于异步下载多个文件的资源,但我看不到如何将其与处理流的要求相结合。
我总是可以在单独的线程中进行下载,但这不是与 aiohttp 的架构目标相冲突吗?
参考:
https://gist.github.com/Hammer2900/2b5da5c08f6406ab49ddb02b0c5ae9f7【问题讨论】:
【参考方案1】:感谢 IRC Freenode 上的 graingert #python ?
import anyio.to_thread
async def foo():
async def download(symbol):
async with session.get(f"url/symbol") as resp:
await do_something(resp)
async with aiohttp.ClientSession() as session, session.ws_connect(
URL
) as wsock, anyio.create_task_group() as tg:
async for msg in wsock:
if msg.type != aiohttp.WSMsgType.TEXT:
J = json.loads(msg.data)
symbol = J["data"]["s"]
tg.start_soon(download, symbol)
await anyio.to_thread.run(process_update, symbol, J)
【讨论】:
以上是关于使用 aiohttp 下载多个文件时处理 websocket 流的主要内容,如果未能解决你的问题,请参考以下文章
aiohttp 异步http请求-5.下载大文件边下载边保存(节省内存)
在查找“aiohttp.web”的模块规范时发出 azure 测试聊天机器人“错误(ModuleNotFoundError:没有名为“aiohttp”的模块)