带有异步计时器的 Python 异步 websocket 客户端

Posted

技术标签:

【中文标题】带有异步计时器的 Python 异步 websocket 客户端【英文标题】:Python async websocket client with async timer 【发布时间】:2016-02-20 22:16:13 【问题描述】:

我需要有一个长时间运行的 websocket 客户端来接收来自 websocket 服务器的推送消息,并且我需要监视客户端的连接状态:如果连接断开,我需要找出。

我的方法是定期记录一个常量字符串,如果没有检测到日志消息就会触发警报。

我的想法:1)有一个响应不规则传入消息的 websocket 客户端。并且 2) 同时有循环在 websocket 客户端抛出 ConnectionClosed 异常时停止记录消息。

我对新的 3.5 异步语法很感兴趣。 This websocket 的实现专门基于 asyncio。文档中的client 看起来与我需要的完全一样。

但是,我不知道如何添加第二个协程来执行我的日志记录语句并且在 websocket 连接抛出 ConnectionClosed 时以某种方式停止。

这里有一些东西可以开始对话,但这不起作用,因为 alive 方法阻塞了事件循环。我正在寻找一种优雅的解决方案,可以同时运行这两种方法。

#!/usr/bin/env python

import asyncio
import logging

import websockets

logger = logging.getLogger(__name__)

is_alive = True


async def alive():
    while is_alive:
        logger.info('alive')
        await asyncio.sleep(300)


async def async_processing():
    async with websockets.connect('ws://localhost:8765') as websocket:
        while True:
            try:
                message = await websocket.recv()
                print(message)

            except websockets.exceptions.ConnectionClosed:
                print('ConnectionClosed')
                is_alive = False
                break


asyncio.get_event_loop().run_until_complete(alive())
asyncio.get_event_loop().run_until_complete(async_processing())

【问题讨论】:

【参考方案1】:

实际上run_until_complete 在这里阻塞,因为它一直等到alive 完成。

你可以分两步解决:

    使用asyncio.ensure_future 调度协程(立即运行而不等待结果),每个返回任务。 等待任务完成asyncio.wait

代码如下:

tasks = [
   asyncio.ensure_future(alive()),
   asyncio.ensure_future(async_processing())
]
asyncio.get_event_loop().run_until_complete(asyncio.wait(tasks))

正如@Vincent 提到的wait 接受任务,所以ensure_future 是不必要的:

asyncio.get_event_loop().run_until_complete(asyncio.wait([   
   alive(),
   async_processing()
]))

【讨论】:

像魅力一样工作。非常感谢。 您可以将协程列表传递给asyncio.wait,在您的示例中无需使用asyncio.ensure_future 如何在此处添加额外代码以在关闭连接后重新连接到 websocket 服务器?

以上是关于带有异步计时器的 Python 异步 websocket 客户端的主要内容,如果未能解决你的问题,请参考以下文章

Flux Utils 存储和异步定时器

如何处理来自异步函数python的数据

对由 NSTimer 异步发送的 NSNotification 进行单元测试

带有 Python 请求的异步请求

异步编程理解异步

异步编程理解异步