为啥线程中的 python asyncio 进程在 Linux 上似乎不稳定?
Posted
技术标签:
【中文标题】为啥线程中的 python asyncio 进程在 Linux 上似乎不稳定?【英文标题】:Why python asyncio process in a thread seems unstable on Linux?为什么线程中的 python asyncio 进程在 Linux 上似乎不稳定? 【发布时间】:2018-12-19 17:47:44 【问题描述】:我尝试从 Qt 应用程序运行 python3
异步外部命令。在我使用多处理线程来执行此操作而不冻结 Qt 应用程序之前。但是现在,我想用QThread
来做这件事,以便能够腌制并给出QtWindows
作为其他一些函数的参数(这里没有介绍)。我做到了,并在我的Windows
操作系统上成功测试了它,但我在Linux
操作系统上尝试了该应用程序,我收到以下错误:RuntimeError: Cannot add child handler, the child watcher does not have a loop attached
从那时起,我尝试隔离问题,并获得了下面复制问题的最小(尽可能)示例。
当然,正如我之前提到的,如果我将QThreadPool
替换为multiprocessing.thread
的列表,则此示例运行良好。我还意识到了令我惊讶的事情:如果我在示例的最后部分取消注释 rc = subp([sys.executable,"./HelloWorld.py"])
行,它也可以工作。我无法解释为什么。
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
## IMPORTS ##
from functools import partial
from PyQt5 import QtCore
from PyQt5.QtCore import QThreadPool, QRunnable, QCoreApplication
import sys
import asyncio.subprocess
# Global variables
Qpool = QtCore.QThreadPool()
def subp(cmd_list):
""" """
if sys.platform.startswith('linux'):
new_loop = asyncio.new_event_loop()
asyncio.set_event_loop(new_loop)
elif sys.platform.startswith('win'):
new_loop = asyncio.ProactorEventLoop() # for subprocess' pipes on Windows
asyncio.set_event_loop(new_loop)
else :
print('[ERROR] OS not available for encodage... EXIT')
sys.exit(2)
rc, stdout, stderr= new_loop.run_until_complete(get_subp(cmd_list) )
new_loop.close()
if rc!=0 :
print('Exit not zero (): '.format(rc, sys.exc_info()[0]) )#, exc_info=True)
return rc, stdout, stderr
async def get_subp(cmd_list):
""" """
print('subp: '+' '.join(cmd_list) )
# Create the subprocess, redirect the standard output into a pipe
create = asyncio.create_subprocess_exec(*cmd_list, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE) #
proc = await create
# read child's stdout/stderr concurrently (capture and display)
try:
stdout, stderr = await asyncio.gather(
read_stream_and_display(proc.stdout),
read_stream_and_display(proc.stderr))
except Exception:
proc.kill()
raise
finally:
rc = await proc.wait()
print(" [Exit ] ".format(rc)+' '.join(cmd_list))
return rc, stdout, stderr
async def read_stream_and_display(stream):
""" """
async for line in stream:
print(line, flush=True)
class Qrun_from_job(QtCore.QRunnable):
def __init__(self, job, arg):
super(Qrun_from_job, self).__init__()
self.job=job
self.arg=arg
def run(self):
code = partial(self.job)
code()
def ThdSomething(job,arg):
testRunnable = Qrun_from_job(job,arg)
Qpool.start(testRunnable)
def testThatThing():
rc = subp([sys.executable,"./HelloWorld.py"])
if __name__=='__main__':
app = QCoreApplication([])
# rc = subp([sys.executable,"./HelloWorld.py"])
ThdSomething(testThatThing,'tests')
sys.exit(app.exec_())
使用 HelloWorld.py 文件:
#!/usr/bin/env python3
import sys
if __name__=='__main__':
print('HelloWorld')
sys.exit(0)
因此我有两个问题:如何使这个示例与 QThread
正常工作?为什么之前调用异步任务(调用subp
函数)会改变Linux 上示例的稳定性?
编辑
根据@user4815162342 的建议,我尝试使用run_coroutine_threadsafe
使用下面的代码。但它不起作用并返回相同的错误,即RuntimeError: Cannot add child handler, the child watcher does not have a loop attached
。我还尝试通过模块 mutliprocessing
中的等效命令来更改 threading
命令;最后一个,命令subp
永远不会启动。
代码:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
## IMPORTS ##
import sys
import asyncio.subprocess
import threading
import multiprocessing
# at top-level
loop = asyncio.new_event_loop()
def spin_loop():
asyncio.set_event_loop(loop)
loop.run_forever()
def subp(cmd_list):
# submit the task to asyncio
fut = asyncio.run_coroutine_threadsafe(get_subp(cmd_list), loop)
# wait for the task to finish
rc, stdout, stderr = fut.result()
return rc, stdout, stderr
async def get_subp(cmd_list):
""" """
print('subp: '+' '.join(cmd_list) )
# Create the subprocess, redirect the standard output into a pipe
proc = await asyncio.create_subprocess_exec(*cmd_list, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE) #
# read child's stdout/stderr concurrently (capture and display)
try:
stdout, stderr = await asyncio.gather(
read_stream_and_display(proc.stdout),
read_stream_and_display(proc.stderr))
except Exception:
proc.kill()
raise
finally:
rc = await proc.wait()
print(" [Exit ] ".format(rc)+' '.join(cmd_list))
return rc, stdout, stderr
async def read_stream_and_display(stream):
""" """
async for line in stream:
print(line, flush=True)
if __name__=='__main__':
threading.Thread(target=spin_loop, daemon=True).start()
# multiprocessing.Process(target=spin_loop, daemon=True).start()
print('thread passed')
rc = subp([sys.executable,"./HelloWorld.py"])
print('end')
sys.exit(0)
【问题讨论】:
作为一般设计原则,创建新的事件循环只是为了运行单个子例程是不必要和浪费的。相反,创建一个事件循环并在单独的线程中运行它。单个事件循环完全有能力同时为多个请求提供服务——事实上,这正是它所擅长的。使用asyncio.run_coroutine_threadsafe
向事件循环提交协程,并使用result()
方法等待协程完成。 (您也可以使用add_done_callback
在结果可用时收到通知,在这种情况下,您可能需要线程开始。)
@user4815162342 你认为这能解决问题吗?
@user4815162342 否则,我同意你的原则,即使我在这里应用它会遇到一些麻烦。
是的,我认为这会解决问题。我无法证明这一点,因为我无法轻松运行您的代码,并且我不想将其发布为答案,因为它不能直接解决您的问题。
我尝试使用run_coroutine_threadsafe
编辑我的问题。
【参考方案1】:
作为一般设计原则,创建新的事件循环只是为了运行单个子例程是不必要且浪费的。相反,创建一个事件循环,在单独的线程中运行它,并通过使用asyncio.run_coroutine_threadsafe
向它提交任务来满足您所有的 asyncio 需求。
例如:
# at top-level
loop = asyncio.new_event_loop()
def spin_loop():
asyncio.set_event_loop(loop)
loop.run_forever()
asyncio.get_child_watcher().attach_loop(loop)
threading.Thread(target=spin_loop, daemon=True).start()
# ... the rest of your code ...
有了这个,您可以使用以下命令轻松地从任何线程执行任何 asyncio 代码:
def subp(cmd_list):
# submit the task to asyncio
fut = asyncio.run_coroutine_threadsafe(get_subp(cmd_list), loop)
# wait for the task to finish
rc, stdout, stderr = fut.result()
return rc, stdout, stderr
请注意,您可以使用add_done_callback
在asyncio.run_coroutine_threadsafe
返回的未来完成时收到通知,因此您可能一开始就不需要线程。
请注意,与事件循环的所有交互都应通过上述run_coroutine_threadsafe
(提交协程时)或当您需要事件循环调用普通函数时通过loop.call_soon_threadsafe
。例如,要停止事件循环,您可以调用 loop.call_soon_threadsafe(loop.stop)
。
【讨论】:
感谢您的回答。不幸的是,它并没有解决问题(除非我误解了),但也许它有助于更精确地隔离它。我重新编辑了我的问题,以包括对您的解决方案的最清晰的理解。也许你会对这个问题有更好的了解。 @R.N 这一次你完全理解它,并且对最小示例表示赞赏 - 删除 Qt 依赖项使我可以运行它。我通过在启动线程之前添加asyncio.get_child_watcher().attach_loop(loop)
让它工作。这消除了最小示例中的错误。我已经相应地修改了答案。
我测试了你的修改,这次效果很好。我会尽快验证你的答案。我之前有一个愚蠢的问题:你如何正确地停止来自spin_loop
的永远运行的循环?因为我尝试使用loop.stop()
,但是之后如果我使用print(loop.is_running())
进行检查,答案是 True ;如果我尝试使用loop.close()
,我会收到一个错误,因为我无法关闭正在运行的循环。
@R.N 所有从自己线程外部与循环的交互都需要通过asyncio.run_coroutine_threadsafe
(对于协程)或loop.call_soon_threadsafe
(对于普通函数)。您可以使用loop.call_soon_threadsafe(loop.stop)
停止循环。这些函数将锁定适当的互斥体,并提醒事件循环它发生了需要注意的新事物。【参考方案2】:
我怀疑你正在做的事情根本不受支持 - 根据documentation:
要处理信号和执行子进程,事件循环必须在主线程中运行。
当您尝试执行子进程时,我认为在另一个线程中运行新的事件循环不起作用。
问题是,Qt 已经有一个事件循环,你真正需要的是说服asyncio
使用它。这意味着您需要一个事件循环实现,它提供在“Qt 的事件循环”之上实现的“异步事件循环接口”。
我相信asyncqt 提供了这样的实现。您可能想尝试使用QEventLoop(app)
代替asyncio.new_event_loop()
。
【讨论】:
其实利用@user4815162342带来的解决方案,我成功的在一个新的QThread
中执行了一个子进程in loop run in a new QThread
,即不在主循环中。否则,感谢我不知道的参考 asyncqt。
你是对的,但我希望这里的文档过于严格,并且限制会在某个时候解除。虽然确实设置子观察程序涉及必须从主线程运行的信号处理代码,但SIGCHLD
的传递并不依赖于在主线程中运行的事件循环。信号将被传递到主线程,但处理程序只会将一个字节写入管道,无论它在哪里运行,事件循环都会拾取该字节。这就是答案中的代码有效的原因。
我有同样的问题,我无法在 linux 中运行我的 QThread,但它可以在 Windows 上运行。你能发布一个在 QThread 中执行子进程的代码的 sn-p 吗?以上是关于为啥线程中的 python asyncio 进程在 Linux 上似乎不稳定?的主要内容,如果未能解决你的问题,请参考以下文章
为啥在不同线程中调用 asyncio subprocess.communicate 会挂起?
我啥时候应该在常规线程上使用 asyncio,为啥?它是不是提供性能提升?
python 多进程和多线程3 —— asyncio - 异步IO