如何由工作人员重置异步事件循环?

Posted

技术标签:

【中文标题】如何由工作人员重置异步事件循环?【英文标题】:How to reset an asyncio eventloop by a worker? 【发布时间】:2019-01-01 19:20:45 【问题描述】:

我正在使用 asyncio forever() 事件循环。现在我想在进程或信号或文件更改之后重新启动循环(停止循环并重新创建一个新循环),但我有一些问题:


这里是三个简化的代码 sn-ps,其中演示了一些协程工作者和一个协程循环重启器:


第一次尝试:

import asyncio

async def coro_worker(proc):
    print(f'Worker: proc started.')
    while True:
        print(f'Worker: proc process.')
        await asyncio.sleep(proc)

async def reset_loop(loop):
    # Some process
    for i in range(5):  # Like a process.
        print(f'i counting for reset the eventloop.')
        await asyncio.sleep(1)

    main(loop)  # Expected close the current loop and start a new loop!

def main(previous_loop=None):
    offset = 0
    if previous_loop is not None:  # Trying for close the last loop if exist.
        offset = 1  # An offset to change the process name.
        for task in asyncio.Task.all_tasks():
            print('Cancel the tasks')  # Why it increase up?
            task.cancel()
            # task.clear()
            # task.close()
            # task.stop()

        print("Done cancelling tasks")
        asyncio.get_event_loop().stop()

    process = [1 + offset, 2 + offset]
    loop = asyncio.get_event_loop()
    futures = [loop.create_task(coro_worker(proc)) for proc in process]
    futures.append(loop.create_task(reset_loop(loop)))

    try:
        loop.run_forever()
    except KeyboardInterrupt:
        pass
    except asyncio.CancelledError:
        print('Tasks has been canceled')
        main()  # Recursively
    finally:
        print("Closing Loop")
        loop.close()
main()

输出[1]:

Worker: 1 started.
Worker: 1 process.
Worker: 2 started.
Worker: 2 process.
0 counting for reset the eventloop.
Worker: 1 process.
1 counting for reset the eventloop.
Worker: 2 process.
Worker: 1 process.
2 counting for reset the eventloop.
Worker: 1 process.
3 counting for reset the eventloop.
Worker: 2 process.
Worker: 1 process.
4 counting for reset the eventloop.
Worker: 1 process.
Cancel the tasks
Cancel the tasks
Cancel the tasks
Done cancelling tasks
Closing Loop
Closing Loop
Task exception was never retrieved
future: <Task cancelling coro=<reset_loop() done, defined at reset_asycio.py:11> exception=RuntimeError('Cannot close a running event loop',)>
Traceback (most recent call last):
  File "reset_asycio.py", line 40, in main
    loop.run_forever()
  File "/usr/lib/python3.6/asyncio/base_events.py", line 425, in run_forever
    raise RuntimeError('This event loop is already running')
RuntimeError: This event loop is already running

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "reset_asycio.py", line 17, in reset_loop
    main(loop)  # Expected close the current loop and start a new loop!
  File "reset_asycio.py", line 48, in main
    loop.close()
  File "/usr/lib/python3.6/asyncio/unix_events.py", line 63, in close
    super().close()
  File "/usr/lib/python3.6/asyncio/selector_events.py", line 96, in close
    raise RuntimeError("Cannot close a running event loop")
RuntimeError: Cannot close a running event loop
Task was destroyed but it is pending!
task: <Task pending coro=<reset_loop() running at reset_asycio.py:11>>
reset_asycio.py:51: RuntimeWarning: coroutine 'reset_loop' was never awaited
  main()
Task was destroyed but it is pending!
task: <Task pending coro=<coro_worker() running at reset_asycio.py:4>>
reset_asycio.py:51: RuntimeWarning: coroutine 'coro_worker' was never awaited
  main()
Task was destroyed but it is pending!
task: <Task pending coro=<coro_worker() running at reset_asycio.py:4>>
Task was destroyed but it is pending!
task: <Task pending coro=<coro_worker() running at reset_asycio.py:8> wait_for=<Future cancelled>>
Task was destroyed but it is pending!
task: <Task pending coro=<coro_worker() running at reset_asycio.py:8> wait_for=<Future cancelled>>

#第二次尝试:

.
.
.

def main(previous_loop=None):
    offset = 0
    if previous_loop is not None:  # Trying for close the last loop if exist.
        previous_loop.stop()
        previous_loop.close()
        offset = 1  # An offset to change the process name.

    process = [1 + offset, 2 + offset]
    loop = asyncio.get_event_loop()
    futures = [loop.create_task(coro_worker(proc)) for proc in process]
    futures.append(loop.create_task(reset_loop(loop)))

    try:
        loop.run_forever()
    except KeyboardInterrupt:
        pass
    except asyncio.CancelledError:
        print('Tasks has been canceled')
        main()  # Recursively
    finally:
        print("Closing Loop")
        loop.close()
main()

输出[2]:

Worker: 1 started.
Worker: 1 process.
Worker: 2 started.
Worker: 2 process.
0 counting for reset the eventloop.
Worker: 1 process.
1 counting for reset the eventloop.
Worker: 2 process.
Worker: 1 process.
2 counting for reset the eventloop.
Worker: 1 process.
3 counting for reset the eventloop.
Worker: 2 process.
Worker: 1 process.
4 counting for reset the eventloop.
Worker: 1 process.
Closing Loop
Task exception was never retrieved
future: <Task finished coro=<reset_loop() done, defined at reset_asycio.py:9> exception=RuntimeError('Cannot close a running event loop',)>
Traceback (most recent call last):
  File "reset_asycio.py", line 15, in reset_loop
    main(loop)  # Expected close the current loop and start new loop!
  File "reset_asycio.py", line 21, in main
    previous_loop.close()
  File "/usr/lib/python3.6/asyncio/unix_events.py", line 63, in close
    super().close()
  File "/usr/lib/python3.6/asyncio/selector_events.py", line 96, in close
    raise RuntimeError("Cannot close a running event loop")
RuntimeError: Cannot close a running event loop
Task was destroyed but it is pending!
task: <Task pending coro=<coro_worker() done, defined at reset_asycio.py:3> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7efed846f138>()]>>
Task was destroyed but it is pending!
task: <Task pending coro=<coro_worker() done, defined at reset_asycio.py:3> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7efed846f048>()]>>

第三次尝试:

.
.
.

def main(previous_loop=None):
    offset = 0
    if previous_loop is not None:  # Trying for close the last loop if exist.
        offset = 1  # An offset to change the process name.
        for task in asyncio.Task.all_tasks():
            print('Cancel the tasks')  # Why it increase up?
            task.cancel()

    process = [1 + offset, 2 + offset]
    loop = asyncio.get_event_loop()
    futures = [loop.create_task(coro_worker(proc)) for proc in process]
    futures.append(loop.create_task(reset_loop(loop)))

    try:
        loop.run_forever()
    except KeyboardInterrupt:
        pass
    except asyncio.CancelledError:
        print('Tasks has been canceled')
        main()  # Recursively
    finally:
        print("Closing Loop")
        loop.close()
main()

输出[3]:

Worker: 1 started.
Worker: 1 process.
Worker: 2 started.
Worker: 2 process.
0 counting for reset the eventloop.
Worker: 1 process.
1 counting for reset the eventloop.
Worker: 2 process.
Worker: 1 process.
2 counting for reset the eventloop.
Worker: 1 process.
3 counting for reset the eventloop.
Worker: 2 process.
Worker: 1 process.
4 counting for reset the eventloop.
Worker: 1 process.
Cancel the tasks
Cancel the tasks
Cancel the tasks
Closing Loop
Worker: 2 started.
Worker: 2 process.
Worker: 3 started.
Worker: 3 process.
0 counting for reset the eventloop.
1 counting for reset the eventloop.
Worker: 2 process.
2 counting for reset the eventloop.
Worker: 3 process.
3 counting for reset the eventloop.
Worker: 2 process.
4 counting for reset the eventloop.
Cancel the tasks
Cancel the tasks
Cancel the tasks
Cancel the tasks
Cancel the tasks
Cancel the tasks
Closing Loop
Worker: 2 started.
Worker: 2 process.
Worker: 3 started.
Worker: 3 process.
.
.
.

#问题:

#3rd try中,显然我已经做到了,但是print('Cancel the tasks')每次重启后都会增加,这是什么原因?!

有没有更好的方法来解决这个问题?

请原谅我试图简化它的冗长问题!


[注意]:

我不是在寻找asyncio.timeout() 我还尝试使用另一个线程来重新启动事件循环,但结果不成功。 我正在使用 Python 3.6

【问题讨论】:

为什么要重置事件循环?这是一个不同寻常的想法。 你能解释一下你想用这个实现什么 - 即你要解决什么问题? @user4815162342 我正在尝试使用存储在一个文件中的几个配置来实现一个 SNMP 收集器,我解析这些配置,每个配置都被提供给协程 SNMP 收集器(比如有问题的coro_worker() ) 在初始化循环 (loop.create_task()) 中。我的 SNMP 收集器 (coro_worker()) 有一个无限循环。问题是当配置文件更改时,我无法停止此事件循环 (run_forever()) 并使用新配置重新创建协程 SNMP 收集器。 这可能是 XY 问题。当您专注于如何重置事件循环时,您应该专注于如何正确结束您的任务。 @KlausD。是否需要确保任务结束才能停止或关闭事件循环? 【参考方案1】:

main() 的递归调用和新的事件循环增加了不必要的复杂性。这是一个更简单的原型 - 它监视外部源(文件系统),并且在创建文件时,它只是停止循环。 main() 包含一个处理(重新)创建和取消任务的循环:

import os, asyncio, random

async def monitor():
    loop = asyncio.get_event_loop()
    while True:
        if os.path.exists('reset'):
            print('reset!')
            os.unlink('reset')
            loop.stop()
        await asyncio.sleep(1)

async def work(workid):
    while True:
        t = random.random()
        print(workid, 'sleeping for', t)
        await asyncio.sleep(t)

def main():
    loop = asyncio.get_event_loop()
    loop.create_task(monitor())
    offset = 0
    while True:
        workers = []
        workers.append(loop.create_task(work(offset + 1)))
        workers.append(loop.create_task(work(offset + 2)))
        workers.append(loop.create_task(work(offset + 3)))
        loop.run_forever()
        for t in workers:
            t.cancel()
        offset += 3

if __name__ == '__main__':
    main()

另一种选择是永远不要停止事件循环,而只是触发重置事件:

async def monitor(evt):
    while True:
        if os.path.exists('reset'):
            print('reset!')
            os.unlink('reset')
            evt.set()
        await asyncio.sleep(1)

在这个设计中main() 可以是协程:

async def main():
    loop = asyncio.get_event_loop()
    reset_evt = asyncio.Event()
    loop.create_task(monitor(reset_evt))
    offset = 0
    while True:
        workers = []
        workers.append(loop.create_task(work(offset + 1)))
        workers.append(loop.create_task(work(offset + 2)))
        workers.append(loop.create_task(work(offset + 3)))
        await reset_evt.wait()
        reset_evt.clear()
        for t in workers:
            t.cancel()
        offset += 3

if __name__ == '__main__':
    asyncio.run(main())
    # or asyncio.get_event_loop().run_until_complete(main())

请注意,在这两种变体中,取消任务都是通过await 引发CancelledError 异常来实现的。该任务不能使用try: ... except: ... 捕获所有异常,如果这样做,则需要重新引发异常。

【讨论】:

您的答案中的第二部分是否与 Python 3.7 兼容? @BenyaminJafari 第二部分是专门为 Python 3.7 编写的。它也可以在 Python 3.5/3.6 中工作,只需将 asyncio.run 行替换为它下面的注释掉的行。 有时我在工作人员 (work()) 中遇到此错误:concurrent.futures._base.CancelledError 然后上一个任务与更改后的新任务同时运行。 @BenyaminJafari “遇到错误”是什么意思?您是否可能捕获所有异常?引发 CancelledError 是 asyncio 终止任务的方式。如果您的代码捕捉到该异常,则应立即重新引发它。 我的意思是我有时会在我的work() 方法中遇到这个错误concurrent.futures._base.CancelledError,其中包含tryexception。在异常情况下,会显示此错误,并且我希望它删除的先前任务保留在具有新更改的新任务中。

以上是关于如何由工作人员重置异步事件循环?的主要内容,如果未能解决你的问题,请参考以下文章

JavaScript 工作原理之四-事件循环及异步编程的出现和 5 种更好的 async/await 编程方式(译)

进阶学习5:JavaScript异步编程——同步模式异步模式调用栈工作线程消息队列事件循环回调函数

异步 JavaScript - 事件循环

JavaScript使用事件循环在调用堆栈

工作线程中的同步与异步 ioctl

js找到控件后再下一步操作