为啥在不同线程中调用 asyncio subprocess.communicate 会挂起?
Posted
技术标签:
【中文标题】为啥在不同线程中调用 asyncio subprocess.communicate 会挂起?【英文标题】:Why does asyncio subprocess.communicate hang when called in different thread?为什么在不同线程中调用 asyncio subprocess.communicate 会挂起? 【发布时间】:2018-12-21 10:21:53 【问题描述】:当我必须在异步事件循环中运行子进程时,子进程通信挂起,而整个事情都在一个单独的线程中。
我了解到为了在单独的线程中运行子进程,我需要有
1. an event loop running in main thread, and
2. a child watcher must be initiated in main thread.
在具备上述条件后,我得到了我的子流程工作。但是 subprocess.communicate 现在挂了。如果从主线程调用它,相同的代码正在工作。
进一步挖掘后,我观察到通信挂起,因为该过程没有自行完成。 ie await process.wait()
居然挂了。
当我尝试在子进程本身发出的命令挂起时,我看到通信挂起,但这里不是这种情况。
import asyncio
import shlex
import threading
import subprocess
async def sendcmd(cmd):
cmdseq = tuple(shlex.split(cmd))
print(cmd)
p = await asyncio.create_subprocess_exec(*cmdseq, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
print(p.pid)
output = (await asyncio.wait_for(p.communicate(), 5))[0]
output = output.decode('utf8')
print(output)
return output
async def myfunc(cmd):
o = await sendcmd(cmd)
return o
def myfunc2():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
tasks = []
tasks.append(asyncio.ensure_future(myfunc('uname -a')))
loop.run_until_complete(asyncio.gather(*tasks))
async def myfunc3():
t = threading.Thread(target=myfunc2)
t.start()
t.join()
def main():
asyncio.get_child_watcher()
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.ensure_future(myfunc3()))
loop.close()
main()
【问题讨论】:
为什么要结合线程和异步?您是否考虑过以不需要线程的方式构建程序?如果你有阻塞代码要运行,你可以随时使用run_in_executor
。
我考虑过重组,但工作量很大。事实上,除了完成这项工作之外,我还很好奇是什么让行为有所不同。
为了更清楚地说明我的要求——我有一个调度程序守护程序,它在套接字服务器(线程)上进行侦听,当收到请求时它会执行一些工作。当在线程套接字服务器上收到请求时触发此作业时,将在线程上,并且该作业包含异步事件循环。
我考虑过用异步套接字服务器代替线程套接字服务器(尽管此时这对我来说是一项艰巨的任务)。通过这样做,由于套接字服务器的事件循环已经在运行,因此它不会允许该作业所需的另一个事件循环。
@user4815162342 run_in_executor 与 p.communicate() 有同样的问题,它在不在主线程中执行时挂起!
【参考方案1】:
看起来子进程 SIGCHLD 不是由工作线程接收,而是由父线程接收。这些意味着 process.wait() 不会由操作系统发出信号。还有一个discussion about this here。
看起来子观察者应该检测 SIGCHLD 并将其传播到其他线程(或 pid)及其事件循环,这似乎也是它的主要设计目的。 (文档缺乏,所以需要阅读源代码。)
注意:我认为 t.join() 阻塞了运行子观察者的主线程,因此需要修复。我只是在那里放了一个 while 循环,并在 t.is_alive() 返回 False 时结束主事件循环。
我注意到signal_noop 正在发射,这很好。该问题似乎与似乎设置正确的 signal.set_wakeup_fd(self._csock.fileno()) 有关。我需要进行更多调试以了解该事件是如何处理的,以及为什么主事件循环没有收到该信号。我注意到 unix_events.py 中的 _process_self_data(self, data) 没有发生。
Signals and threads
Python 信号处理程序始终在 Python 主线程中执行,即使信号是在另一个线程中接收到的。这意味着信号不能用作线程间通信的手段。您可以改用线程模块中的同步原语。
此外,只允许主线程设置新的信号处理程序。
【讨论】:
【参考方案2】:我认为这可以解决问题。对线程使用循环 run_in_executor。
import asyncio
import shlex
import threading
import subprocess
import logging
async def sendcmd(cmd):
cmdseq = tuple(shlex.split(cmd))
print(cmd)
p = await asyncio.create_subprocess_exec(*cmdseq, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
print(p.pid)
output = (await asyncio.wait_for(p.communicate(), 5))[0]
output = output.decode('utf8')
print(output)
return output
async def myfunc(cmd):
o = await sendcmd(cmd)
return o
def myfunc2():
thread_loop = asyncio.new_event_loop()
asyncio.set_event_loop(thread_loop)
thread_loop.set_debug(True)
tasks = []
tasks.append(asyncio.ensure_future(myfunc('uname -a')))
thread_loop.run_until_complete(asyncio.gather(*tasks))
thread_loop.close()
async def myfunc3(loop=None):
await loop.run_in_executor(None, myfunc2)
def main():
logfilename='test.log'
print('Writing log to '.format(logfilename))
logging.basicConfig(filename=logfilename, level=logging.INFO, format='%(asctime)s %(name)s %(module)s %(levelname)-8s %(message)s')
logging.getLogger('asyncio').setLevel(logging.DEBUG)
root = logging.getLogger(__name__)
cw=asyncio.get_child_watcher()
main_loop = asyncio.get_event_loop()
main_loop.run_until_complete(asyncio.ensure_future(myfunc3(loop=main_loop)))
cw.close()
main_loop.close()
main()
【讨论】:
是的,但是每次你都会用5秒来等待进程返回一些数据。 ***.com/questions/57629155/…以上是关于为啥在不同线程中调用 asyncio subprocess.communicate 会挂起?的主要内容,如果未能解决你的问题,请参考以下文章
为啥线程中的 python asyncio 进程在 Linux 上似乎不稳定?
我啥时候应该在常规线程上使用 asyncio,为啥?它是不是提供性能提升?
为啥要显式调用 asyncio.StreamWriter.drain?
简单的 Python 多线程网络服务器,带有 Asyncio 和在主函数中调用的事件