为啥线程中的 python asyncio 进程在 Linux 上似乎不稳定?

Posted

技术标签:

【中文标题】为啥线程中的 python asyncio 进程在 Linux 上似乎不稳定?【英文标题】:Why python asyncio process in a thread seems unstable on Linux?为什么线程中的 python asyncio 进程在 Linux 上似乎不稳定? 【发布时间】:2018-12-19 17:47:44 【问题描述】:

我尝试从 Qt 应用程序运行 python3 异步外部命令。在我使用多处理线程来执行此操作而不冻结 Qt 应用程序之前。但是现在,我想用QThread 来做这件事,以便能够腌制并给出QtWindows 作为其他一些函数的参数(这里没有介绍)。我做到了,并在我的Windows 操作系统上成功测试了它,但我在Linux 操作系统上尝试了该应用程序,我收到以下错误:RuntimeError: Cannot add child handler, the child watcher does not have a loop attached

从那时起,我尝试隔离问题,并获得了下面复制问题的最小(尽可能)示例。 当然,正如我之前提到的,如果我将QThreadPool 替换为multiprocessing.thread 的列表,则此示例运行良好。我还意识到了令我惊讶的事情:如果我在示例的最后部分取消注释 rc = subp([sys.executable,"./HelloWorld.py"]) 行,它也可以工作。我无法解释为什么。

#!/usr/bin/env python3

# -*- coding: utf-8 -*-

## IMPORTS ##
from functools import partial
from PyQt5 import QtCore
from PyQt5.QtCore import QThreadPool, QRunnable, QCoreApplication
import sys
import asyncio.subprocess

# Global variables
Qpool = QtCore.QThreadPool()


def subp(cmd_list):
    """ """

    if sys.platform.startswith('linux'):
        new_loop = asyncio.new_event_loop()
        asyncio.set_event_loop(new_loop)
    elif sys.platform.startswith('win'):
        new_loop = asyncio.ProactorEventLoop() # for subprocess' pipes on Windows
        asyncio.set_event_loop(new_loop)
    else :
        print('[ERROR]     OS not available for encodage... EXIT')
        sys.exit(2)

    rc, stdout, stderr= new_loop.run_until_complete(get_subp(cmd_list) )
    new_loop.close()
    if rc!=0 :
        print('Exit not zero (): '.format(rc, sys.exc_info()[0]) )#, exc_info=True)
    return rc, stdout, stderr

async def get_subp(cmd_list):
    """ """

    print('subp: '+' '.join(cmd_list) )
    # Create the subprocess, redirect the standard output into a pipe
    create = asyncio.create_subprocess_exec(*cmd_list, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE) #
    proc = await create

    # read child's stdout/stderr concurrently (capture and display)
    try:
        stdout, stderr = await asyncio.gather(
            read_stream_and_display(proc.stdout),
            read_stream_and_display(proc.stderr))
    except Exception:
        proc.kill()
        raise
    finally:
        rc = await proc.wait()
        print(" [Exit ] ".format(rc)+' '.join(cmd_list))
    return rc, stdout, stderr

async def read_stream_and_display(stream):
    """ """
    async for line in stream:
        print(line, flush=True)

class Qrun_from_job(QtCore.QRunnable):
    def __init__(self, job, arg):
        super(Qrun_from_job, self).__init__()
        self.job=job
        self.arg=arg

    def run(self):
        code = partial(self.job)
        code()

def ThdSomething(job,arg):
    testRunnable = Qrun_from_job(job,arg)
    Qpool.start(testRunnable)

def testThatThing():
    rc = subp([sys.executable,"./HelloWorld.py"])


if __name__=='__main__':
    app = QCoreApplication([])
    # rc = subp([sys.executable,"./HelloWorld.py"])
    ThdSomething(testThatThing,'tests')
    sys.exit(app.exec_())

使用 HelloWorld.py 文件:

#!/usr/bin/env python3
import sys
if __name__=='__main__':
   print('HelloWorld')
   sys.exit(0)

因此我有两个问题:如何使这个示例与 QThread 正常工作?为什么之前调用异步任务(调用subp 函数)会改变Linux 上示例的稳定性?

编辑

根据@user4815162342 的建议,我尝试使用run_coroutine_threadsafe 使用下面的代码。但它不起作用并返回相同的错误,即RuntimeError: Cannot add child handler, the child watcher does not have a loop attached。我还尝试通过模块 mutliprocessing 中的等效命令来更改 threading 命令;最后一个,命令subp 永远不会启动。

代码:

#!/usr/bin/env python3

# -*- coding: utf-8 -*-

## IMPORTS ##
import sys
import asyncio.subprocess
import threading
import multiprocessing

# at top-level
loop = asyncio.new_event_loop()

def spin_loop():
    asyncio.set_event_loop(loop)
    loop.run_forever()

def subp(cmd_list):
    # submit the task to asyncio
    fut = asyncio.run_coroutine_threadsafe(get_subp(cmd_list), loop)
    # wait for the task to finish
    rc, stdout, stderr = fut.result()
    return rc, stdout, stderr

async def get_subp(cmd_list):
    """ """
    print('subp: '+' '.join(cmd_list) )
    # Create the subprocess, redirect the standard output into a pipe
    proc = await asyncio.create_subprocess_exec(*cmd_list, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE) #

    # read child's stdout/stderr concurrently (capture and display)
    try:
        stdout, stderr = await asyncio.gather(
            read_stream_and_display(proc.stdout),
            read_stream_and_display(proc.stderr))
    except Exception:
        proc.kill()
        raise
    finally:
        rc = await proc.wait()
        print(" [Exit ] ".format(rc)+' '.join(cmd_list))
    return rc, stdout, stderr

async def read_stream_and_display(stream):
    """ """
    async for line in stream:
        print(line, flush=True)

if __name__=='__main__':
    threading.Thread(target=spin_loop, daemon=True).start()
    # multiprocessing.Process(target=spin_loop, daemon=True).start()
    print('thread passed')
    rc = subp([sys.executable,"./HelloWorld.py"])
    print('end')
    sys.exit(0)

【问题讨论】:

作为一般设计原则,创建新的事件循环只是为了运行单个子例程是不必要和浪费的。相反,创建一个事件循环并在单独的线程中运行它。单个事件循环完全有能力同时为多个请求提供服务——事实上,这正是它所擅长的。使用asyncio.run_coroutine_threadsafe 向事件循环提交协程,并使用result() 方法等待协程完成。 (您也可以使用add_done_callback 在结果可用时收到通知,在这种情况下,您可能需要线程开始。) @user4815162342 你认为这能解决问题吗? @user4815162342 否则,我同意你的原则,即使我在这里应用它会遇到一些麻烦。 是的,我认为这会解决问题。我无法证明这一点,因为我无法轻松运行您的代码,并且我不想将其发布为答案,因为它不能直接解决您的问题。 我尝试使用run_coroutine_threadsafe 编辑我的问题。 【参考方案1】:

作为一般设计原则,创建新的事件循环只是为了运行单个子例程是不必要且浪费的。相反,创建一个事件循环,在单独的线程中运行它,并通过使用asyncio.run_coroutine_threadsafe 向它提交任务来满足您所有的 asyncio 需求。

例如:

# at top-level
loop = asyncio.new_event_loop()
def spin_loop():
    asyncio.set_event_loop(loop)
    loop.run_forever()

asyncio.get_child_watcher().attach_loop(loop)
threading.Thread(target=spin_loop, daemon=True).start()
# ... the rest of your code ...

有了这个,您可以使用以下命令轻松地从任何线程执行任何 asyncio 代码:

def subp(cmd_list):
    # submit the task to asyncio
    fut = asyncio.run_coroutine_threadsafe(get_subp(cmd_list), loop)
    # wait for the task to finish
    rc, stdout, stderr = fut.result()
    return rc, stdout, stderr

请注意,您可以使用add_done_callbackasyncio.run_coroutine_threadsafe 返回的未来完成时收到通知,因此您可能一开始就不需要线程。

请注意,与事件循环的所有交互都应通过上述run_coroutine_threadsafe(提交协程时)或当您需要事件循环调用普通函数时通过loop.call_soon_threadsafe。例如,要停止事件循环,您可以调用 loop.call_soon_threadsafe(loop.stop)

【讨论】:

感谢您的回答。不幸的是,它并没有解决问题(除非我误解了),但也许它有助于更​​精确地隔离它。我重新编辑了我的问题,以包括对您的解决方案的最清晰的理解。也许你会对这个问题有更好的了解。 @R.N 这一次你完全理解它,并且对最小示例表示赞赏 - 删除 Qt 依赖项使我可以运行它。我通过在启动线程之前添加asyncio.get_child_watcher().attach_loop(loop) 让它工作。这消除了最小示例中的错误。我已经相应地修改了答案。 我测试了你的修改,这次效果很好。我会尽快验证你的答案。我之前有一个愚蠢的问题:你如何正确地停止来自spin_loop 的永远运行的循环?因为我尝试使用loop.stop(),但是之后如果我使用print(loop.is_running()) 进行检查,答案是 True ;如果我尝试使用loop.close(),我会收到一个错误,因为我无法关闭正在运行的循环。 @R.N 所有从自己线程外部与循环的交互都需要通过asyncio.run_coroutine_threadsafe(对于协程)或loop.call_soon_threadsafe(对于普通函数)。您可以使用loop.call_soon_threadsafe(loop.stop) 停止循环。这些函数将锁定适当的互斥体,并提醒事件循环它发生了需要注意的新事物。【参考方案2】:

我怀疑你正在做的事情根本不受支持 - 根据documentation:

要处理信号和执行子进程,事件循环必须在主线程中运行。

当您尝试执行子进程时,我认为在另一个线程中运行新的事件循环不起作用。

问题是,Qt 已经有一个事件循环,你真正需要的是说服asyncio 使用它。这意味着您需要一个事件循环实现,它提供在“Qt 的事件循环”之上实现的“异步事件循环接口”。

我相信asyncqt 提供了这样的实现。您可能想尝试使用QEventLoop(app) 代替asyncio.new_event_loop()

【讨论】:

其实利用@user4815162342带来的解决方案,我成功的在一个新的QThread中执行了一个子进程in loop run in a new QThread,即不在主循环中。否则,感谢我不知道的参考 asyncqt。 你是对的,但我希望这里的文档过于严格,并且限制会在某个时候解除。虽然确实设置子观察程序涉及必须从主线程运行的信号处理代码,但SIGCHLD 的传递并不依赖于在主线程中运行的事件循环。信号将被传递到主线程,但处理程序只会将一个字节写入管道,无论它在哪里运行,事件循环都会拾取该字节。这就是答案中的代码有效的原因。 我有同样的问题,我无法在 linux 中运行我的 QThread,但它可以在 Windows 上运行。你能发布一个在 QThread 中执行子进程的代码的 sn-p 吗?

以上是关于为啥线程中的 python asyncio 进程在 Linux 上似乎不稳定?的主要内容,如果未能解决你的问题,请参考以下文章

为啥在不同线程中调用 asyncio subprocess.communicate 会挂起?

我啥时候应该在常规线程上使用 asyncio,为啥?它是不是提供性能提升?

为啥 asyncio 单线程 速度还能那么快

python 多进程和多线程3 —— asyncio - 异步IO

python 多进程和多线程3 —— asyncio - 异步IO

Python 多处理线程 Asyncio