python asyncio,如何从另一个线程创建和取消任务

Posted

技术标签:

【中文标题】python asyncio,如何从另一个线程创建和取消任务【英文标题】:python asyncio, how to create and cancel tasks from another thread 【发布时间】:2015-05-31 12:12:14 【问题描述】:

我有一个 python 多线程应用程序。我想在一个线程中运行一个异步循环,并从另一个线程向它发布回调和协程。应该很容易,但我无法理解asyncio 的东西。

我想出了以下解决方案,它完成了我想要的一半,请随时发表评论:

import asyncio
from threading import Thread

class B(Thread):
    def __init__(self):
        Thread.__init__(self)
        self.loop = None

    def run(self):
        self.loop = asyncio.new_event_loop()
        asyncio.set_event_loop(self.loop) #why do I need that??
        self.loop.run_forever()

    def stop(self):
        self.loop.call_soon_threadsafe(self.loop.stop)

    def add_task(self, coro):
        """this method should return a task object, that I
          can cancel, not a handle"""
        f = functools.partial(self.loop.create_task, coro)
        return self.loop.call_soon_threadsafe(f)

    def cancel_task(self, xx):
        #no idea

@asyncio.coroutine
def test():
    while True:
        print("running")
        yield from asyncio.sleep(1)

b.start()
time.sleep(1) #need to wait for loop to start
t = b.add_task(test())
time.sleep(10)
#here the program runs fine but how can I cancel the task?

b.stop()

所以启动和停止循环工作正常。我考虑过使用 create_task 创建任务,但该方法不是线程安全的,所以我将它包装在 call_soon_threadsafe 中。但我希望能够获取任务对象以便能够取消任务。我可以使用 Future 和 Condition 做一些复杂的事情,但一定有更简单的方法,不是吗?

【问题讨论】:

【参考方案1】:

我认为您可能需要让您的 add_task 方法知道它是否是从事件循环以外的线程调用的。这样,如果它是从同一个线程调用的,你可以直接调用asyncio.async,否则,它可以做一些额外的工作来将任务从循环的线程传递给调用线程。这是一个例子:

import time
import asyncio
import functools
from threading import Thread, current_thread, Event
from concurrent.futures import Future

class B(Thread):
    def __init__(self, start_event):
        Thread.__init__(self)
        self.loop = None
        self.tid = None
        self.event = start_event

    def run(self):
        self.loop = asyncio.new_event_loop()
        asyncio.set_event_loop(self.loop)
        self.tid = current_thread()
        self.loop.call_soon(self.event.set)
        self.loop.run_forever()

    def stop(self):
        self.loop.call_soon_threadsafe(self.loop.stop)

    def add_task(self, coro):
        """this method should return a task object, that I
          can cancel, not a handle"""
        def _async_add(func, fut):
            try:
                ret = func()
                fut.set_result(ret)
            except Exception as e:
                fut.set_exception(e)

        f = functools.partial(asyncio.async, coro, loop=self.loop)
        if current_thread() == self.tid:
            return f() # We can call directly if we're not going between threads.
        else:
            # We're in a non-event loop thread so we use a Future
            # to get the task from the event loop thread once
            # it's ready.
            fut = Future()
            self.loop.call_soon_threadsafe(_async_add, f, fut)
            return fut.result()

    def cancel_task(self, task):
        self.loop.call_soon_threadsafe(task.cancel)


@asyncio.coroutine
def test():
    while True:
        print("running")
        yield from asyncio.sleep(1)

event = Event()
b = B(event)
b.start()
event.wait() # Let the loop's thread signal us, rather than sleeping
t = b.add_task(test()) # This is a real task
time.sleep(10)
b.stop()

首先,我们将事件循环的线程id保存在run方法中,这样我们就可以确定以后对add_task的调用是否来自其他线程。如果从非事件循环线程调用add_task,我们使用call_soon_threadsafe 调用将调度协程的函数,然后使用concurrent.futures.Future 将任务传递回调用线程,调用线程等待Future 的结果。

关于取消任务的注意事项:当您在 Task 上调用 cancel 时,下次事件循环运行时协程中将引发 CancelledError。这意味着 Task 正在包装的协程将在下次遇到屈服点时由于异常而中止 - 除非协程捕获 CancelledError 并阻止自身中止。另请注意,这仅在被包装的函数实际上是可中断协程时才有效;例如,由BaseEventLoop.run_in_executor 返回的asyncio.Future 无法真正取消,因为它实际上包裹在concurrent.futures.Future 周围,并且一旦它们的底层函数实际开始执行,就无法取消它们。在这些情况下,asyncio.Future 会说它已取消,但实际在执行器中运行的函数将继续运行。

编辑:根据 Andrew Svetlov 的建议,将第一个示例更新为使用 concurrent.futures.Future,而不是 queue.Queue

注意:asyncio.async 已弃用,因为版本 3.4.4 改用 asyncio.ensure_future

【讨论】:

感谢您提供的示例,它帮助我解决了我遇到的几个问题。顺便说一句,我还必须用 Future(loop=self.loop) 来实例化 Future,否则在某些情况下,未来会采取错误的循环 @OlivierRD 你应该使用concurrent.futures.Future,而不是asyncio.Futureconcurrent.futures.Future 不接受 loop 关键字参数。 文档似乎说确实如此:docs.python.org/3/library/asyncio-task.html#asyncio.Future btw task.cancel 似乎真的取消了正在运行的任务。我只是进行了一些测试。任务似乎在第一个 yield 语句处停止 @OlivierRD 这是asyncio.Future 的文档,而不是concurrent.futures.Future【参考方案2】:

你做的一切都是正确的。 对于任务停止make方法

class B(Thread):
    # ...
    def cancel(self, task):
        self.loop.call_soon_threadsafe(task.cancel)

顺便说一句,您必须通过

显式地为创建的线程设置事件循环
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)

因为asyncio 只为主线程创建隐式事件循环。

【讨论】:

这里缺少的部分是如何首先获得task 的句柄。因为OP需要在add_task方法中使用call_soon_threadsafe(self.loop.create_task),所以添加到循环后他实际上并没有任务的句柄。 知道了。你说的对。 @dano 顺便说一句,您可以在答案中使用 concurrent.futures.Future 而不是 Queue 。我认为它更清洁。 是的,我同意使用Future 比使用Queue 更好。我已经更新了我的答案以反映这一点。谢谢!【参考方案3】:

这里仅供参考,我最终实现的代码是基于我在这个网站上获得的帮助,它更简单,因为我不需要所有功能。再次感谢!

import asyncio
from threading import Thread
from concurrent.futures import Future
import functools

class B(Thread):
    def __init__(self):
        Thread.__init__(self)
        self.loop = None

    def run(self):
        self.loop = asyncio.new_event_loop()
        asyncio.set_event_loop(self.loop)
        self.loop.run_forever()

    def stop(self):
        self.loop.call_soon_threadsafe(self.loop.stop)

    def _add_task(self, future, coro):
        task = self.loop.create_task(coro)
        future.set_result(task)

    def add_task(self, coro):
        future = Future()
        p = functools.partial(self._add_task, future, coro)
        self.loop.call_soon_threadsafe(p)
        return future.result() #block until result is available

    def cancel(self, task):
        self.loop.call_soon_threadsafe(task.cancel)

【讨论】:

这仍然适用于 Python 3.5 中的 async/await 协程吗? future.result() 的文档似乎没有表明 result() 阻塞(超时,请参阅docs.python.org/3/library/… ),并且在 add_task 的调用者中,我返回的值似乎是一个 Task 而不是具体的从协程返回的值。此外,docs.python.org/3/library/… 似乎表明不应使用 set_result 我用这个尝试过的要点:gist.github.com/pzelnip/7230b32dc9a27f6e78d9cd78b619245a 当协程看起来是一个任务时从 add_task 返回,而且似乎永远不会终止。【参考方案4】:

从版本 3.4.4 开始,asyncio 提供了一个名为 run_coroutine_threadsafe 的函数,用于将协程对象从线程提交到事件循环。它返回一个concurrent.futures.Future 来访问结果或取消任务。

使用您的示例:

@asyncio.coroutine
def test(loop):
    try:
        while True:
            print("Running")
            yield from asyncio.sleep(1, loop=loop)
    except asyncio.CancelledError:
        print("Cancelled")
        loop.stop()
        raise

loop = asyncio.new_event_loop()
thread = threading.Thread(target=loop.run_forever)
future = asyncio.run_coroutine_threadsafe(test(loop), loop)

thread.start()
time.sleep(5)
future.cancel()
thread.join()

【讨论】:

为了防止竞争条件或死锁,不要直接调用future.cancel()。请改用loop.call_soon_threadsafe(future.cancel)。见here。 @ChangYu-heng 这对于asyncio.Future 期货来说是正确的,但是run_coroutine_threadsafe 返回一个concurrent.futures.Future,它是线程安全的并且不依赖于任何事件循环。 @Vicent 抱歉,我没有仔细阅读原始问题。因此,对此的附加评论是:如果您要从不是事件循环所在的线程执行future.cancel(),请使用loop.call_soon_threadsafe(future.cancel)

以上是关于python asyncio,如何从另一个线程创建和取消任务的主要内容,如果未能解决你的问题,请参考以下文章

如何将python asyncio与线程结合起来?

使用 asyncio ProactorEventLoop 时如何分配线程池

如何从另一个函数进行异步函数调用?

asyncio:等待来自其他线程的事件

[Python 多线程] asyncio (十六)

python 并发专题(十四):asyncio 实战