为啥在不同线程中调用 asyncio subprocess.communicate 会挂起?

Posted

技术标签:

【中文标题】为啥在不同线程中调用 asyncio subprocess.communicate 会挂起?【英文标题】:Why does asyncio subprocess.communicate hang when called in different thread?为什么在不同线程中调用 asyncio subprocess.communicate 会挂起? 【发布时间】:2018-12-21 10:21:53 【问题描述】:

当我必须在异步事件循环中运行子进程时,子进程通信挂起,而整个事情都在一个单独的线程中。

我了解到为了在单独的线程中运行子进程,我需要有

1. an event loop running in main thread, and
2. a child watcher must be initiated in main thread.

在具备上述条件后,我得到了我的子流程工作。但是 subprocess.communicate 现在挂了。如果从主线程调用它,相同的代码正在工作。

进一步挖掘后,我观察到通信挂起,因为该过程没有自行完成。 ie await process.wait() 居然挂了。

当我尝试在子进程本身发出的命令挂起时,我看到通信挂起,但这里不是这种情况。

import asyncio
import shlex
import threading
import subprocess
async def sendcmd(cmd):
    cmdseq = tuple(shlex.split(cmd))
    print(cmd)
    p = await asyncio.create_subprocess_exec(*cmdseq, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    print(p.pid)
    output = (await asyncio.wait_for(p.communicate(), 5))[0]
    output = output.decode('utf8')
    print(output)
    return output


async def myfunc(cmd):
    o = await sendcmd(cmd)
    return o

def myfunc2():
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    tasks = []
    tasks.append(asyncio.ensure_future(myfunc('uname -a')))
    loop.run_until_complete(asyncio.gather(*tasks))

async def myfunc3():
    t = threading.Thread(target=myfunc2)
    t.start()
    t.join()

def main():
    asyncio.get_child_watcher()
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.ensure_future(myfunc3()))
    loop.close()

main()

【问题讨论】:

为什么要结合线程和异步?您是否考虑过以不需要线程的方式构建程序?如果你有阻塞代码要运行,你可以随时使用run_in_executor 我考虑过重组,但工作量很大。事实上,除了完成这项工作之外,我还很好奇是什么让行为有所不同。 为了更清楚地说明我的要求——我有一个调度程序守护程序,它在套接字服务器(线程)上进行侦听,当收到请求时它会执行一些工作。当在线程套接字服务器上收到请求时触发此作业时,将在线程上,并且该作业包含异步事件循环。 我考虑过用异步套接字服务器代替线程套接字服务器(尽管此时这对我来说是一项艰巨的任务)。通过这样做,由于套接字服务器的事件循环已经在运行,因此它不会允许该作业所需的另一个事件循环。 @user4815162342 run_in_executor 与 p.communicate() 有同样的问题,它在不在主线程中执行时挂起! 【参考方案1】:

看起来子进程 SIGCHLD 不是由工作线程接收,而是由父线程接收。这些意味着 process.wait() 不会由操作系统发出信号。还有一个discussion about this here。

看起来子观察者应该检测 SIGCHLD 并将其传播到其他线程(或 pid)及其事件循环,这似乎也是它的主要设计目的。 (文档缺乏,所以需要阅读源代码。)

注意:我认为 t.join() 阻塞了运行子观察者的主线程,因此需要修复。我只是在那里放了一个 while 循环,并在 t.is_alive() 返回 False 时结束主事件循环。

我注意到signal_noop 正在发射,这很好。该问题似乎与似乎设置正确的 signal.set_wakeup_fd(self._csock.fileno()) 有关。我需要进行更多调试以了解该事件是如何处理的,以及为什么主事件循环没有收到该信号。我注意到 unix_events.py 中的 _process_self_data(self, data) 没有发生。

Signals and threads

Python 信号处理程序始终在 Python 主线程中执行,即使信号是在另一个线程中接收到的。这意味着信号不能用作线程间通信的手段。您可以改用线程模块中的同步原语。

此外,只允许主线程设置新的信号处理程序。

【讨论】:

【参考方案2】:

我认为这可以解决问题。对线程使用循环 run_in_executor。

import asyncio
import shlex
import threading
import subprocess
import logging
async def sendcmd(cmd):
    cmdseq = tuple(shlex.split(cmd))
    print(cmd)
    p = await asyncio.create_subprocess_exec(*cmdseq, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    print(p.pid)
    output = (await asyncio.wait_for(p.communicate(), 5))[0]
    output = output.decode('utf8')
    print(output)
    return output

async def myfunc(cmd):
    o = await sendcmd(cmd)
    return o

def myfunc2():
    thread_loop = asyncio.new_event_loop()
    asyncio.set_event_loop(thread_loop)
    thread_loop.set_debug(True)     
    tasks = []
    tasks.append(asyncio.ensure_future(myfunc('uname -a')))
    thread_loop.run_until_complete(asyncio.gather(*tasks))
    thread_loop.close()

async def myfunc3(loop=None):
    await loop.run_in_executor(None, myfunc2)    

def main():
    logfilename='test.log'
    print('Writing log to '.format(logfilename))
    logging.basicConfig(filename=logfilename, level=logging.INFO, format='%(asctime)s %(name)s %(module)s %(levelname)-8s %(message)s')
    logging.getLogger('asyncio').setLevel(logging.DEBUG)
    root = logging.getLogger(__name__)

    cw=asyncio.get_child_watcher()
    main_loop = asyncio.get_event_loop()
    main_loop.run_until_complete(asyncio.ensure_future(myfunc3(loop=main_loop)))
    cw.close()
    main_loop.close()

main()

【讨论】:

是的,但是每次你都会用5秒来等待进程返回一些数据。 ***.com/questions/57629155/…

以上是关于为啥在不同线程中调用 asyncio subprocess.communicate 会挂起?的主要内容,如果未能解决你的问题,请参考以下文章

为啥线程中的 python asyncio 进程在 Linux 上似乎不稳定?

我啥时候应该在常规线程上使用 asyncio,为啥?它是不是提供性能提升?

为啥要显式调用 asyncio.StreamWriter.drain?

简单的 Python 多线程网络服务器,带有 Asyncio 和在主函数中调用的事件

Goroutines vs asyncio 任务 + CPU-bound 调用的线程池

为啥 WCF 服务能够处理来自不同进程的调用而不是来自线程的调用