使用 aiohttp 下载多个文件时处理 websocket 流

Posted

技术标签:

【中文标题】使用 aiohttp 下载多个文件时处理 websocket 流【英文标题】:Process websocket stream while downloading multiple files using aiohttp 【发布时间】:2021-08-03 11:53:11 【问题描述】:

我正在按照说明 (here) 在我的本地计算机上镜像 Binance Exchange 上的多个订单簿。

假设为简单起见,我希望镜像 2 个符号的订单簿:ETHBTC 和 DOGEBTC(实际上是 350+)。

首先我必须缓冲 websocket 订单更新流:

wss://stream.binance.com:9443/stream?streams=ETHBTC@depth@100ms wss://stream.binance.com:9443/stream?streams=DOGEBTC@depth@100ms

现在我必须下载快照:

https://api.binance.com/api/v3/depth?symbol=ETHBTC&limit=1000 https://api.binance.com/api/v3/depth?symbol=DOGEBTC&limit=1000

一旦我有了快照,我就会将缓冲区(正在进行中)应用到它们,从而产生一个状态。

之后,所有订单更新都可以简单地应用于状态。

对于我可以做的更新流:

        async with aiohttp.ClientSession() as session:
            async with session.ws_connect(URL) as wsock:
                async for msg in wsock:
                    if msg.type != aiohttp.WSMsgType.TEXT:
                        J = json.loads(msg.data)
                        symbol = J['data']['s']

                        process_update(symbol, J)

但是,一旦第一次更新到来,我如何才能开始下载快照,并使用完成处理程序来处理它,从而不中断流?

如果我正在跟踪 300 个符号,则同时发生 300 次下载。

我找到了关于异步下载多个文件的资源,但我看不到如何将其与处理流的要求相结合。

我总是可以在单独的线程中进行下载,但这不是与 aiohttp 的架构目标相冲突吗?

参考:

https://gist.github.com/Hammer2900/2b5da5c08f6406ab49ddb02b0c5ae9f7

【问题讨论】:

【参考方案1】:

感谢 IRC Freenode 上的 graingert #python ?

import anyio.to_thread


async def foo():
    async def download(symbol):
        async with session.get(f"url/symbol") as resp:
            await do_something(resp)

    async with aiohttp.ClientSession() as session, session.ws_connect(
        URL
    ) as wsock, anyio.create_task_group() as tg:
        async for msg in wsock:
            if msg.type != aiohttp.WSMsgType.TEXT:
                J = json.loads(msg.data)
                symbol = J["data"]["s"]

                tg.start_soon(download, symbol)
                await anyio.to_thread.run(process_update, symbol, J)

【讨论】:

以上是关于使用 aiohttp 下载多个文件时处理 websocket 流的主要内容,如果未能解决你的问题,请参考以下文章

aiohttp 异步请求的任务异常

aiohttp 异步http请求-5.下载大文件边下载边保存(节省内存)

在查找“aiohttp.web”的模块规范时发出 azure 测试聊天机器人“错误(ModuleNotFoundError:没有名为“aiohttp”的模块)

aiohttp的安装

处理从 Amazon S3 下载的多个文件?

asyncio 网络抓取 101:使用 aiohttp 获取多个 url