asyncio:等待来自其他线程的事件

Posted

技术标签:

【中文标题】asyncio:等待来自其他线程的事件【英文标题】:asyncio: Wait for event from other thread 【发布时间】:2015-10-07 18:51:40 【问题描述】:

我正在用 Python 设计一个应用程序,它应该访问一台机器来执行一些(冗长的)任务。 asyncio 模块似乎是所有与网络相关的东西的好选择,但现在我需要访问一个特定组件的串行端口。我已经为实际的串行端口实现了一种抽象层,但不知道如何合理地将它与 asyncio 集成。

以下设置:我有一个运行循环的线程,它定期与机器对话并解码响应。使用方法enqueue_query(),我可以将查询字符串放入队列中,然后由另一个线程发送到机器并引起响应。通过传入threading.Event(或任何带有set() 方法的东西),调用者可以执行阻塞等待响应。然后看起来像这样:

f = threading.Event()
ch.enqueue_query('2 getnlimit', f)
f.wait()
print(ch.get_query_responses())

我现在的目标是将这些行放入协程并让 asyncio 处理此等待,以便应用程序在此期间可以执行其他操作。我怎么能这样做?通过将f.wait() 包装到 Executor 中可能会起作用,但这似乎有点愚蠢,因为这会创建一个新线程,只是为了等待另一个线程做某事。

【问题讨论】:

【参考方案1】:

通过传入threading.Event(或任何带有set() 方法的东西),调用者可以执行阻塞等待响应。

鉴于您的查询函数的上述行为,您所需要的只是asyncio.Event 的线程安全版本。只需 3 行代码:

import asyncio
class Event_ts(asyncio.Event):
    #TODO: clear() method
    def set(self):
        #FIXME: The _loop attribute is not documented as public api!
        self._loop.call_soon_threadsafe(super().set)

功能测试:

def threaded(event):
    import time
    while True:
        event.set()
        time.sleep(1)

async def main():
    import threading
    e = Event_ts()
    threading.Thread(target=threaded, args=(e,)).start()
    while True:
        await e.wait()
        e.clear()
        print('whatever')

asyncio.ensure_future(main())
asyncio.get_event_loop().run_forever()

【讨论】:

这在大多数情况下应该可以正常工作,但值得注意的是,从事件循环线程调用 e.set() 实际上会延迟 set() 调用的执行,直到调用方法产生/等待或完全退出。通常这很好,但有时可能会导致意外行为。 解决了我的问题。在我尝试在捕获键盘输入并在每次用户输入内容时引发事件的线程中设置异步事件之前,但 event.wait() 没有得到它并且一直在等待。通过用您的 Event_ts 替换它,它工作正常,即在另一个线程设置事件后继续执行。 Event_ts 太棒了!【参考方案2】:

最简单的方法是完全按照您的建议进行操作 - 将对 f.wait() 的调用包装在执行程序中:

@asyncio.coroutine
def do_enqueue():
    f = threading.Event()
    ch.enqueue_query('2 getnlimit', f)
    yield from loop.run_in_executor(None, f.wait)
    print(ch.get_query_responses())

您确实会产生启动线程池的开销(至少对于第一次调用,从那时起,池将保留在内存中),但是任何提供类似 threading.Event() 的实现的解决方案都带有线程-安全的阻塞和非阻塞 API,内部不依赖任何后台线程,需要做更多的工作。

【讨论】:

感谢您的回答!这样做听起来很合理,我想我会使用这个以及华作高建议的方式,具体取决于用例。 但请注意,默认执行程序带有 max_workers(即等待事件的最大协程)设置为 40。当我使用所提供的解决方案使 threading.barrier 可等待时,这让我很不爽。【参考方案3】:

华作高回答中的Event_ts 类在 Python 3.9 之前运行良好,但在 3.10 中运行良好。这是因为在 Python 3.10 中,私有属性 _loop 最初是 None

以下代码适用于 Python 3.10 以及 3.9 及更低版本。 (我也添加了clear() 方法。)

import asyncio
class Event_ts(asyncio.Event):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        if self._loop is None:
            self._loop = asyncio.get_event_loop()

    def set(self):
        self._loop.call_soon_threadsafe(super().set)

    def clear(self):
        self._loop.call_soon_threadsafe(super().clear)

【讨论】:

以上是关于asyncio:等待来自其他线程的事件的主要内容,如果未能解决你的问题,请参考以下文章

如何将python asyncio与线程结合起来?

如何在 python asyncio 中等待 select.select 调用

gj13 asyncio并发编程

python协程(4):asyncio

asyncio - 多次等待协程(周期性任务)

Flask asyncio aiohttp - RuntimeError:线程'Thread-2'中没有当前事件循环