Python3 asyncio“任务已被破坏,但处于待处理状态”,具有某些特定条件

Posted

技术标签:

【中文标题】Python3 asyncio“任务已被破坏,但处于待处理状态”,具有某些特定条件【英文标题】:Python3 asyncio "Task was destroyed but it is pending" with some specific condition 【发布时间】:2016-02-03 22:57:24 【问题描述】:

这是简化的代码,它使用 python3 协程并为 SIGING 和 SIGTERM 信号设置处理程序以正确停止作业:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import argparse
import asyncio
import signal
import sys


def my_handler(signum, frame):
    print('Stopping')
    asyncio.get_event_loop().stop()
    # Do some staff
    sys.exit()


@asyncio.coroutine
def prob_ip(ip_addr):

    print('Ping ip:%s' % ip_addr)
    proc = yield from asyncio.create_subprocess_exec('ping', '-c', '3', ip_addr)
    ret_code = yield from proc.wait()
    if ret_code != 0:
        print("ip:%s doesn't responding" % ip_addr)
        # Do some staff
        yield from asyncio.sleep(2)
        # Do more staff
        yield from asyncio.sleep(16)


@asyncio.coroutine
def run_probing():

    print('Start probing')
    # Do some staff
    yield from asyncio.sleep(1)

    while True:
        yield from asyncio.wait([prob_ip('192.168.1.3'), prob_ip('192.168.1.2')])
        yield from asyncio.sleep(60)


def main():
    parser = argparse.ArgumentParser()
    parser.description = "Probing ip."
    parser.parse_args()

    signal.signal(signal.SIGINT, my_handler)
    signal.signal(signal.SIGTERM, my_handler)

    asyncio.get_event_loop().run_until_complete(run_probing())


if __name__ == '__main__':
    main()

当我通过以下方式运行它时:

python3 test1.py

它按 Ctrl-C 停止,没有任何警告。 但是当我通过以下方式运行它时:

python3 -m test1

它通过 Ctrl-C 打印警告:

$ python3 -m test1 
Start probing
Ping ip:192.168.1.2
Ping ip:192.168.1.3
PING 192.168.1.2 (192.168.1.2): 56 data bytes
PING 192.168.1.3 (192.168.1.3): 56 data bytes
^C--- 192.168.1.2 ping statistics ---
--- 192.168.1.3 ping statistics ---
1 packets transmitted, 0 packets received, 100% packet loss
1 packets transmitted, 0 packets received, 100% packet loss
Stopping
Task was destroyed but it is pending!
task: <Task pending coro=<prob_ip() running at /tmp/test1.py:22> wait_for=<Future pending cb=[Task._wakeup()]> cb=[_wait.<locals>._on_completion() at /usr/lib/python3.4/asyncio/tasks.py:394]>
Task was destroyed but it is pending!
task: <Task pending coro=<prob_ip() running at /tmp/test1.py:22> wait_for=<Future pending cb=[Task._wakeup()]> cb=[_wait.<locals>._on_completion() at /usr/lib/python3.4/asyncio/tasks.py:394]>

如果我通过以下方式安装此脚本,我会收到同样的警告:

from setuptools import setup

setup(name='some_scripts',
      version='1.0.0.0',
      author='Some Team',
      author_email='team@team.ru',
      url='https://www.todo.ru',
      description='Some scripts',
      packages=['my_package'],
      entry_points='console_scripts': [
          'test1=my_package.test1:main',
      ],
      )

我的python版本是“3.4.2”

【问题讨论】:

您可能想使用loop.add_signal_handler,尽管它与您的问题无关。 是的,谢谢,这种方式在我的情况下看起来更好。但这无济于事。我仍然收到同样的警告。 【参考方案1】:

使用asyncio.gather 代替asyncio.wait

取消:如果外部Future被取消,所有的孩子(还没有完成)也被取消。

例子:

def handler(future, loop):
    future.cancel()
    loop.stop()

@asyncio.coroutine
def do_some(arg):
    while True:
        print("Do stuff with %s" % arg)
        yield from asyncio.sleep(2)

loop = asyncio.get_event_loop()
future = asyncio.gather(do_some(1), do_some(2))
loop.add_signal_handler(signal.SIGINT, handler, future, loop)
loop.run_forever()

【讨论】:

【参考方案2】:

好的。我想我已经知道应该如何停止所有任务了。

    首先,据我所知。 BaseEventLoop.stop() 只是为了阻止 BaseEventLoop.run_forever()。所以应该通过Future.cancel 取消所有任务。要获取所有任务,您可以使用Task.all_tasks 静态方法。 取消所有任务后,run_until_complete 将引发 asyncio.CancelledError 异常。因此,如果不想将其打印到 stderr,就应该抓住它。

    而且,在某些情况下,我会收到此错误:TypeError: signal handler must be signal.SIG_IGN, signal.SIG_DFL, or a callable object。 我发现了一些关于这个错误的话题:

    https://bugs.python.org/issue23548 https://github.com/buildinspace/peru/issues/98

    他们都说可以通过在退出应用程序之前关闭循环来修复它。

所以我们得到了这个代码:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import asyncio
import signal


def my_handler():
    print('Stopping')
    for task in asyncio.Task.all_tasks():
        task.cancel()


@asyncio.coroutine
def do_some(some_args):
    while True:
        print("Do staff with %s" % some_args)
        yield from asyncio.sleep(2)


def main():
    loop = asyncio.get_event_loop()

    loop.add_signal_handler(signal.SIGINT, my_handler)

    try:
        loop.run_until_complete(asyncio.wait([do_some(1), do_some(2)]))
    except asyncio.CancelledError:
        print('Tasks has been canceled')
    finally:
        loop.close()


if __name__ == '__main__':
    main()

它也适用于signal.signal。但文森特 noticed loop.add_signal_handler 在这种情况下看起来更好。

但我仍然不确定这是停止所有任务的最佳方式。

【讨论】:

(1) 与 futures 不同,任务可能会忽略 .cancel() (catch CancelledError) (2) 如果任务需要清理,那么您可以 loop.run_until_complete(asyncio.gather(*asyncio.Task.all_tasks())) before loop.close() 如果您不希望信号处理程序取消所有正在运行的任务,请查看我的答案(已编辑)。 @J.F.Sebastian 是的,谢谢,我必须等待未完成的任务才能退出。但是(我不知道为什么)asyncio.gather 在这种情况下不起作用(在 python3.5.0 中的列表中)。 asyncio.wait 有效,所以我们得到这样的代码:gist.github.com/willir/b521450b66be6e6b238c。编辑:我也不知道为什么我不需要从这个(实际上是第二个)loop.run_until_complete 中捕获 asyncio.CancelledError。 @willir: (1) 我猜,你有错误的论点(asyncio.wait()asynio.gather() 有不同的接口,例如,请注意上面调用中的*)。 (2) 如果您不了解CancelledError 的方式/地点/时间可能会在提出单独的问题时提出。

以上是关于Python3 asyncio“任务已被破坏,但处于待处理状态”,具有某些特定条件的主要内容,如果未能解决你的问题,请参考以下文章

asyncio--python3未来并发编程主流充满野心的模块

Python3 asyncio异步编程介绍

python协程(4):asyncio

与 python3 asyncio 建立连接

asyncio模块

python3.7中asyncio的具体实现