Python 的 asyncio.Event() 跨不同的类

Posted

技术标签:

【中文标题】Python 的 asyncio.Event() 跨不同的类【英文标题】:Python's asyncio.Event() across different classes 【发布时间】:2020-10-29 18:50:43 【问题描述】:

我正在编写一个 Python 程序来与基于 CAN 总线的设备进行交互。为此,我成功地使用了 python-can 模块。我还使用 asyncio 来响应异步事件。我编写了一个“CanBusSequencer”类使用的“CanBusManager”类。 “CanBusManager”类负责生成/发送/接收消息,CanBusSequencer 驱动要发送的消息序列。 在序列中的某个时刻,我想等到收到特定消息以“解锁”序列中要发送的剩余消息。代码概述:

main.py

async def main():
   
   event = asyncio.Event()
   sequencer = CanBusSequencer(event)
   task = asyncio.create_task(sequencer.doSequence())
   await task
 
asyncio.run(main(), debug=True)

canBusSequencer.py

from canBusManager import CanBusManager

class CanBusSequencer:
 
   def __init__(self, event)
 
      self.event = event
      self.canManager = CanBusManager(event)

   async def doSequence(self):
 
      for index, row in self.df_sequence.iterrows():
         if:...
            self.canManager.sendMsg(...)
         else:
            self.canManager.sendMsg(...)
            await self.event.wait()
            self.event.clear()

canBusManager.py

import can

class CanBusManager():
 
   def __init__(self, event):
 
      self.event = event
      self.startListening()
 
 **EDIT**
    def startListening(self):
    
       self.msgNotifier = can.Notifier(self.canBus, self.receivedMsgCallback)
 **EDIT**
 
   def receivedMsgCallback(self, msg):
 
      if(msg == ...):
         self.event.set()
   

现在我的程序仍然等待 self.event.wait(),即使收到相关消息并执行 self.event.set()。使用 debug = True 运行程序会显示

RuntimeError: Non-thread-safe operation invoked on an event loop other than the current one

我真的不明白。它与异步事件循环有关,不知何故没有正确定义/管理。我来自 C++ 世界,目前正在用 Python 编写我的第一个大型程序。任何指导将不胜感激:)

【问题讨论】:

【参考方案1】:

您的问题没有说明您如何安排调用 receivedMsgCallback

如果它是由使用后台线程的经典“异步”API 调用的,那么它将从运行事件循环的线程外部调用。根据the documentation,asyncio 原语不是线程安全的,因此从另一个线程调用event.set() 无法与正在运行的事件循环正确同步,这就是您的程序无法唤醒的原因什么时候应该。

如果你想从事件循环线程外部执行任何与异步相关的操作,例如调用Event.set,则需要使用call_soon_threadsafe 或等效项。例如:

    def receivedMsgCallback(self, msg):
        if msg == ...:
            self.loop.call_soon_threadsafe(self.event.set)

事件循环对象应该可用于CanBusManager 对象,可能通过将其传递给其构造函数并将其分配给self.loop

附带说明,如果您创建一个任务只是为了立即等待它,那么您一开始就不需要任务。换句话说,您可以将task = asyncio.create_task(sequencer.doSequence()); await task 替换为更简单的await sequencer.doSequence()

【讨论】:

太棒了,它有效。 receivedMsgCallback() 是通过 python-can 模块定义的。我现在将事件循环传递给构造函数。谢谢你的解释!

以上是关于Python 的 asyncio.Event() 跨不同的类的主要内容,如果未能解决你的问题,请参考以下文章

所有任务完成后如何终止python asyncio event_loop

asyncio的简单使用,python异步高效处理数据,asyncio.get_event_loop(),loop.run_until_complete(main()),loop.close()

DeprecationWarning: There is no current event loop loop = asyncio.get_event_loop()

DeprecationWarning: There is no current event loop loop = asyncio.get_event_loop()

用 asyncio.run 替换 asyncio.get_event_loop().run_until_complete

跟随 asyncio.run() 时 asyncio.get_event_loop() 失败