将异步任务发送到在其他线程中运行的循环
Posted
技术标签:
【中文标题】将异步任务发送到在其他线程中运行的循环【英文标题】:Send asyncio tasks to loop running in other thread 【发布时间】:2015-08-17 21:08:19 【问题描述】:如何异步插入任务以在另一个线程中运行的asyncio
事件循环中运行?
我的动机是在解释器中支持交互式异步工作负载。我无法阻止主 REPL 线程。
示例
我目前有缺陷的理解说以下应该有效。为什么不呢?实现上述目标的更好方法是什么?
import asyncio
from threading import Thread
loop = asyncio.new_event_loop()
def f(loop):
asyncio.set_event_loop(loop)
loop.run_forever()
t = Thread(target=f, args=(loop,))
t.start()
@asyncio.coroutine
def g():
yield from asyncio.sleep(1)
print('Hello, world!')
asyncio.async(g(), loop=loop)
【问题讨论】:
您可以尝试将urwid 用作 REPL——它可以与开箱即用的 asyncio 一起使用。 或ipython——从7.0版本开始,它也可以直接在REPL中运行异步函数。 有没有办法在另一个线程中运行整个 python 脚本。我有本主题中指定的类似需求。 【参考方案1】:您必须使用call_soon_threadsafe
来安排来自不同线程的回调:
import asyncio
from threading import Thread
loop = asyncio.new_event_loop()
def f(loop):
asyncio.set_event_loop(loop)
loop.run_forever()
t = Thread(target=f, args=(loop,))
t.start()
@asyncio.coroutine
def g():
yield from asyncio.sleep(1)
print('Hello, world!')
loop.call_soon_threadsafe(asyncio.async, g())
更多信息请参见https://docs.python.org/3/library/asyncio-dev.html#asyncio-multithreading。
编辑:支持异步工作负载的解释器示例
# vim: filetype=python3 tabstop=2 expandtab
import asyncio as aio
import random
@aio.coroutine
def async_eval(input_, sec):
yield from aio.sleep(sec)
print("")
try:
result = eval(input_)
except Exception as e:
print("< !r does not compute >".format(input_))
else:
print("< !r = >".format(input_, result))
@aio.coroutine
def main(loop):
while True:
input_ = yield from loop.run_in_executor(None, input, "> ")
if input_ == "quit":
break
elif input_ == "":
continue
else:
sec = random.uniform(5, 10)
print("< !r scheduled for execution in :.02 sec>".format(input_, sec))
aio.async(async_eval(input_, sec))
loop = aio.get_event_loop()
loop.run_until_complete(main(loop))
loop.close()
【讨论】:
不错。如果我想得到g()
的返回结果呢? (如果 g
实际上返回了一些东西。)
您可以将 Future 传递给g
并在g
中设置它的结果,然后您可以在另一个线程的另一个事件循环中yield from that_future
。你也可以创建另一个协程,在其中 yield from g()
然后 call_soon_threadsafe
在那个新协程上。
如果您需要在另一个线程中的事件循环上运行协程,请使用:asyncio.run_coroutine_threadsafe(my_coro(param1), loop)
3.7 的有效语法是什么-我也在打这个
@PhilBot 希望您还没有等待答案,但我在单独的答案中创建了一个 3.7+ 的示例。【参考方案2】:
Jashandeep Sohi 回答中的第一个示例在 3.7+ 中对我不起作用,并打印有关已弃用注释的警告。我把它改造成一个运行在 3.8 以下的东西。我也对其进行了一些调整以满足我的需求。我是 Python 中多线程的新手(但通常没有多线程),因此感谢您提供任何建议、指导等:
import asyncio
from threading import Thread
loop = asyncio.new_event_loop()
running = True
def evaluate(future):
global running
stop = future.result()
if stop:
print("press enter to exit...")
running = False
def side_thread(loop):
asyncio.set_event_loop(loop)
loop.run_forever()
thread = Thread(target=side_thread, args=(loop,), daemon=True)
thread.start()
async def display(text):
await asyncio.sleep(5)
print("echo:", text)
return text == "exit"
while running:
text = input("enter text: ")
future = asyncio.run_coroutine_threadsafe(display(text), loop)
future.add_done_callback(evaluate)
print("exiting")
echo 和其他输出会与提示冲突,但应该足以证明它可以正常工作。
我不确定的一件事是从一个线程设置全局running
并从另一个线程读取它。我认为 GIL 可能会同步线程缓存,但我很想确认(或不确认)。
【讨论】:
以上是关于将异步任务发送到在其他线程中运行的循环的主要内容,如果未能解决你的问题,请参考以下文章
js的事件循环机制:同步与异步任务(setTimeout,setInterval)宏任务,微任务(Promise,process.nextTick)