通过阅读python subprocess源码尝试实现非阻塞读取stdout以及非阻塞wait

Posted 林肯公园

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了通过阅读python subprocess源码尝试实现非阻塞读取stdout以及非阻塞wait相关的知识,希望对你有一定的参考价值。

http://blog.chinaunix.net/uid-23504396-id-4661783.html

 

执行subprocess的时候,执行不是问题
最麻烦的是获取进程执行后的回显来确认是否正确执行,还不能阻塞
还要获取进程执行后的返回状态确认进程是否正确结束,也不能阻塞

分开解决这个问题
我们先解决第一个问题,获取回显

一般获取回显,代码都是如下写法

点击(此处)折叠或打开

  1. sub_process = subprocess.Popen(command, stdin = subprocess.PIPE,stdout = subprocess.PIPE,stderr = subprocess.PIPE, shell = True)


为了搞清楚subprocess是怎么获取子进程stdout的,我们首先看看 subprocess.PIPE是什么
进入代码里可以看见subprocess.PIPE 直接是个int -1
再看看网上一般获取subprocess回显的代码

点击(此处)折叠或打开

  1. lines = sub_process.stdout.readline()

 subprocess.PIPE是-1,为什么Popen这个类的stdout变成了什么对象,可以用readline方法呢
打印type可以知道Popen对象的stdout的类型是file,我们看看subprocess里做了什么操作。
我们看看Popen的init方法(python 2.7.8)

stdout传入_get_handles函数准换出(p2cread, p2cwrite,c2pread, c2pwrite,errread, errwrite)

点击(此处)折叠或打开

  1. (p2cread, p2cwrite,
  2.          c2pread, c2pwrite,
  3.          errread, errwrite) = self._get_handles(stdin, stdout, stderr)


p2cread, p2cwrite,c2pread, c2pwrite,errread, errwrite  传入_execute_child中,这个函数看名字就知道是真正的执行函数

点击(此处)折叠或打开

  1. self._execute_child(args, executable, preexec_fn, close_fds,
  2.                                 cwd, env, universal_newlines,
  3.                                 startupinfo, creationflags, shell,
  4.                                 p2cread, p2cwrite,
  5.                                 c2pread, c2pwrite,
  6.                                 errread, errwrite)


p2cread, p2cwrite,c2pread, c2pwrite,errread, errwrite传入执行函数后,stdout等通过fdopen函数转换问file对象

点击(此处)折叠或打开

  1.         if p2cwrite is not None:
  2.             self.stdin = os.fdopen(p2cwrite, ‘wb‘, bufsize)
  3.         if c2pread is not None:
  4.             if universal_newlines:
  5.                 self.stdout = os.fdopen(c2pread, ‘rU‘, bufsize)
  6.             else:
  7.                 self.stdout = os.fdopen(c2pread, ‘rb‘, bufsize)
  8.         if errread is not None:
  9.             if universal_newlines:
  10.                 self.stderr = os.fdopen(errread, ‘rU‘, bufsize)
  11.             else:
  12.                 self.stderr = os.fdopen(errread, ‘rb‘, bufsize)


我们先看看_get_handles方法,部分代码如下

点击(此处)折叠或打开

  1. def _get_handles(self, stdin, stdout, stderr):
  2.             """Construct and return tuple with IO objects:
  3.             p2cread, p2cwrite, c2pread, c2pwrite, errread, errwrite
  4.             """
  5.             p2cread, p2cwrite = None, None
  6.             c2pread, c2pwrite = None, None
  7.             errread, errwrite = None, None
  8.             if stdin is None:
  9.                 pass
  10.             elif stdin == PIPE:
  11.                 p2cread, p2cwrite = self.pipe_cloexec()
  12.             elif isinstance(stdin, int):
  13.                 p2cread = stdin
  14.             else:
  15.                 # Assuming file-like object
  16.                 p2cread = stdin.fileno()


再跟踪进去看pipe_cloexec

点击(此处)折叠或打开

  1. def pipe_cloexec(self):
  2.             """Create a pipe with FDs set CLOEXEC."""
  3.             # Pipes‘ FDs are set CLOEXEC by default because we don‘t want them
  4.             # to be inherited by other subprocesses: the CLOEXEC flag is removed
  5.             # from the child is FDs by _dup2(), between fork() and exec().
  6.             # This is not atomic: we would need the pipe2() syscall for that.
  7.             r, w = os.pipe()
  8.             self._set_cloexec_flag(r)
  9.             self._set_cloexec_flag(w)
  10.             return r, w


可以知道,当stdout赋值为subprocess.PIPE(即-1)时,subprocess内部通过os.pipe()创建一个管道,并返回管道的读,写文件描述符

点击(此处)折叠或打开

  1. os.pipe()
  2. Create a pipe. Return a pair of file descriptors (r, w) usable for reading and writing, respectively.

_set_cloexec_flag函数暂时不用详细看了,只是通过fcntl设置下文件做控制。

所以从这里我可以看出stdout等传入subprocess.PIPE后,这个值只是作为一个判断值,判断为此值以后,内部通过os.piep()用作输入输出传送。
由于subprocess内部创建的pipe()大小不可控,所以推举做法是使用StringIO创建一个内存文件对象,并传入这个对象的fileno,参考文章
http://backend.blog.163.com/blog/static/2022941262014016710912/

现在就剩下单问题就是,这个管道如何获得子进程的输入输出的呢,这就要看_execute_child里是怎么做的了
具体说明我直接在下面源代码里注释说明,最后再做总结

点击(此处)折叠或打开

  1. def _execute_child(self, args, executable, preexec_fn, close_fds,
  2.                  cwd, env, universal_newlines,
  3.                  startupinfo, creationflags, shell,
  4.                  p2cread, p2cwrite,
  5.                  c2pread, c2pwrite,
  6.                  errread, errwrite):
  7.     """Execute program (POSIX version)"""
  8.     if isinstance(args, types.StringTypes):
  9.         args = [args]
  10.     else:
  11.         args = list(args)
  12.     if shell:
  13.         args = ["/bin/sh", "-c"] + args
  14.         if executable:
  15.             args[0] = executable
  16.     if executable is None:
  17.         executable = args[0]
  18.     #这里又创建了一个管道,这个管道只用来获取自进程try后except出来的内容,不是获取stderr
  19.     errpipe_read, errpipe_write = self.pipe_cloexec()
  20.     try:
  21.         try:
  22.             gc_was_enabled = gc.isenabled()
  23.             #这里关闭了gc回收,防止对象被回收,这里值得学习。
  24.             gc.disable()
  25.             try:
  26.                 self.pid = os.fork()
  27.             except:
  28.                 if gc_was_enabled:
  29.                     gc.enable()
  30.                 raise
  31.             self._child_created = True
  32.             if self.pid == 0:
  33.                 #如果pid为0,表示自己是子进程,执行下面代码(父进程获取到的是子进程的PID,不执行此代码)
  34.                 #父子进程pipe()通信原理——利用pipe()建立起来的无名文件(无路径名)。只用该系统调用所返回的文件描述符来标识该文件.
  35.                 #只有调用pipe()的进程及其子孙进程才能识别此文件描述符,才能利用该文件(管道)进行通信。当这些进程不再使用此管道时,核心收回其索引结点。
  36.                 #如果Pope对象初始化的时候,stdin stdout stderr都用subprocess.PIPE的话,那么fork前会创建3个管道,并传入对应的文件描述符进来
  37.                 try:
  38.                     #关闭从父进程复制过来的的不需要的管道的一端
  39.                     if p2cwrite is not None:
  40.                         os.close(p2cwrite)
  41.                     if c2pread is not None:
  42.                         os.close(c2pread)
  43.                     if errread is not None:
  44.                         os.close(errread)
  45.                     os.close(errpipe_read)
  46.                     
  47.                     #下面都是做了一些文件描述符复制操作,反正通过下面的代码将子进程的输出传到父进程
  48.                     #那些描述符复制操作基本就相当于把子进程的stdout、stdin、stderr的fd绑定的父进程传过来的文件描述符上
  49.                     # When duping fds, if there arises a situation
  50.                     # where one of the fds is either 0, 1 or 2, it
  51.                     # is possible that it is overwritten (#12607).
  52.                     if c2pwrite == 0:
  53.                         c2pwrite = os.dup(c2pwrite)
  54.                     if errwrite == 0 or errwrite == 1:
  55.                         errwrite = os.dup(errwrite)
  56.                     # Dup fds for child
  57.                     def _dup2(a, b):
  58.                         # dup2() removes the CLOEXEC flag but
  59.                         # we must do it ourselves if dup2()
  60.                         # would be a no-op (issue #10806).
  61.                         if a == b:
  62.                             self._set_cloexec_flag(a, False)
  63.                         elif a is not None:
  64.                             os.dup2(a, b)
  65.                     _dup2(p2cread, 0)
  66.                     _dup2(c2pwrite, 1)
  67.                     _dup2(errwrite, 2)
  68.                     #2.7才有的写法,2.6这样写报错,2.7大概这样写比list里找快一点,所以用了dict
  69.                     #如果管道文件描述符大于2的话,关闭从主进程赋值过来的管道的一端,
  70.                     closed = { None }
  71.                     for fd in [p2cread, c2pwrite, errwrite]:
  72.                         if fd not in closed and fd > 2:
  73.                             os.close(fd)
  74.                             closed.add(fd)
  75.                     #这里控制关闭前面用来保存except输出的管道
  76.                     if close_fds:
  77.                         self._close_fds(but=errpipe_write)
  78.                     #切换下执行目录防止运行出错,这里也值得学习!
  79.                     if cwd is not None:
  80.                         os.chdir(cwd)
  81.                     if preexec_fn:
  82.                         preexec_fn()
  83.                     #可以看到,最终是通过execvp/execvpe来执行系统命令的
  84.                     if env is None:
  85.                         os.execvp(executable, args)
  86.                     else:
  87.                         os.execvpe(executable, args, env)
  88.                 except:
  89.                     exc_type, exc_value, tb = sys.exc_info()
  90.                     # Save the traceback and attach it to the exception object
  91.                     exc_lines = traceback.format_exception(exc_type,
  92.                                                          exc_value,
  93.                                                          tb)
  94.                     exc_value.child_traceback = ‘‘.join(exc_lines)
  95.                     #子进程将错误信息写入接受except的管道的写端
  96.                     os.write(errpipe_write, pickle.dumps(exc_value))
  97.                 #这里退出子进程
  98.                 os._exit(255)
  99.             #父进程启动自进程后,重新打开gc回收
  100.             if gc_was_enabled:
  101.                 gc.enable()
  102.         finally:
  103.             #父关闭保存子进程except输出的管道的写端
  104.             os.close(errpipe_write)
  105.         #父进程也关闭不需要使用的管道的一端
  106.         if p2cread is not None and p2cwrite is not None:
  107.             os.close(p2cread)
  108.         if c2pwrite is not None and c2pread is not None:
  109.             os.close(c2pwrite)
  110.         if errwrite is not None and errread is not None:
  111.             os.close(errwrite)
  112.         #通过获取except输出的管道的读端获取最大1M的数据
  113.         data = _eintr_retry_call(os.read, errpipe_read, 1048576)
  114.     finally:
  115.         #父关闭保存子进程except输出的管道的读端
  116.         os.close(errpipe_read)
  117.     #如果有子进程except输出,抛出自定义错误,init函数那边会try到并做相应处理
  118.     if data != "":
  119.         try:
  120.             _eintr_retry_call(os.waitpid, self.pid, 0)
  121.         except OSError as e:
  122.             if e.errno != errno.ECHILD:
  123.                 raise
  124.         child_exception = pickle.loads(data)
  125.         raise child_exception


下面我们总结下,创建Popen对象时,我们传入subprocess.PIPE。
内部通过os.pipe()创建1-3个管道
生成的子进程复制了这些管道的文件描述符,子进程内部将自己的输出绑定到这写管道上
父进程通过os.fdopen将管道的文件描述符打开为file对象
并赋值给self.stdin  self.stdout stderr

因为是file对象,我们就可以直接通过read、readline、readlines等方法获取回显的字符串了
但是由于file对象的read、readline、readlines方法都是阻塞的,那么我们可以这样。
新建立一个线程去读取,并把读出来的内容塞入一个列表,每次我们主进程都去读取这个列表的最后一列
线程中读取后写入列表的延迟 需要大于主进程读取列表最后一列的延迟,以免判断内容还没被主进程读取已经进入下一列

读取子进程回显函数

点击(此处)折叠或打开

  1. def stdout_theard(end_mark,cur_stdout,stdout_lock,string_list):
  2.     #用户获取subprocess的stdout输出的线程,防止阻塞
  3.     #cur_stdout是一个file对象,end_mark是个随机字符串,获取到这个字符串表明结束
  4.     #先暂停0.01秒
  5.     time.sleep(0.01)
  6.     for i in range(3000):
  7.         try:
  8.             out_put = cur_stdout.readline()
  9.             if not out_put:
  10.                 #添加结束标记
  11.                 stdout_lock.acquire()
  12.                 string_list.append(end_mark)
  13.                 stdout_lock.release()
  14.                 break
  15.             if out_put == end_mark:
  16.             #out put正好和end_mark相等的特殊情况
  17.                 continue
  18.             #外部获取到指定内容会清理string_list列表,所以要加锁
  19.             stdout_lock.acquire()
  20.             string_list.append(out_put.rstrip().lstrip())
  21.             stdout_lock.release()
  22.             time.sleep(0.03)
  23.         except:
  24.             print ‘wtffff!!!!!!tuichule !!‘
  25.             break


主进程中启动线程

点击(此处)折叠或打开

  1.     stdout_list = []
  2.     stdout_lock = threading.Lock()
  3.     end_mark = ‘end9c2nfxz‘
  4.     cur_stdout_thread = threading.Thread(target=stdout_theard, args=(end_mark,sub_process.stdout,stdout_lock,stdout_list))
  5.     cur_stdout_thread.setDaemon(‘True‘)
  6.     cur_stdout_thread.start()


主进程中判断子进程回显内容是否正确
我的例子是的作用是  erl进程里输入command_reload_list里的所有命令,并判断并记录每个命令执行后是否有ok_str返回

点击(此处)折叠或打开

  1. for command_reload_dict in command_reload_list:
  2.         sub_process.stdin.write(command_reload_dict[‘com‘] + ‘\r\n‘)
  3.         #每个命令执行后通过线程修改的str list的最后一个元素来获取取回显的最后一行
  4.         #得到返回值等于ok_str的为正确,延迟0.2后退出并清理回显,否则总共等待300*0.01秒
  5.         ok_str = ‘load module %s true‘ % command_reload_dict[‘mod‘]
  6.         for i in xrange(300):
  7.             if len(stdout_list)>0:
  8.              #获得正确的返回,退出
  9.                 if stdout_list[-1] == ok_str:
  10.                     #记录当前模块热更成功
  11.                     command_reload_dict[‘res‘] = ‘ok‘
  12.                     break
  13.                 if stdout_list[-1] == end_mark:
  14.                  #遇到end_mark 说明读线程已经结束,说明有错,直接退出
  15.                     return_value[‘msg‘] += ‘reload mod process has been exit in [%s]‘ % command_reload_dict[‘mod‘]
  16.                     return return_value
  17.                     break
  18.             time.sleep(0.01)
  19.         #清除上个reload命令产生的回显
  20.         stdout_lock.acquire()
  21.         del stdout_list[:]
  22.         stdout_lock.release()
  23.     #子进程输入退出命令
  24.     sub_process.stdin.write(‘q().\r\n‘)
  25.     #等待tmp erl 进程退出
  26.     for i in xrange(300):
  27.         if len(stdout_list)>0:
  28.             if stdout_list[-1] == end_mark:
  29.                 break
  30.         time.sleep(0.01)


=======================================第二个问题的分割线=========================================
进程执行后的返回状态确认进程是否正确结束,不能阻塞
之前我有接触过这个问题的,当时还没细看subprocess源码
http://blog.chinaunix.net/uid-23504396-id-4471612.html

我现在的写法

点击(此处)折叠或打开

  1. if stop_process.poll() is None:
  2.         try:
  3.             if stop_process.stdout:
  4.                 stop_process.stdout.close()
  5.             if stop_process.stderr:
  6.                 stop_process.stderr.close()
  7.             stop_process.terminate()
  8.             time.sleep(0.5)
  9.             if stop_process.poll() is None:
  10.                 stop_process.kill()
  11.                 time.sleep(0.2)
  12.                 if stop_process.poll() is None:
  13.                     print ‘wtf!!!!‘
  14.                 else:
  15.                     stop_process.wait()
  16.             else:
  17.                 stop_process.wait()
  18.         except:
  19.             print ‘wtf?‘


上面代码我一直有个疑问,poll()之后如果有问题进程还没结束怎么办?
因为sub_process.wait()是阻塞的,所以我在poll以后直接sub_process.wait()是不是也会被卡住?
subprocess的wati到底调用了什么?

当然我也可以像获取回显那样,启一个线程,主进程通过一个可以指定次数的循环来获取wait返回。
不过这样做太绕了,所以我们直接进代码看,把wait彻底搞明白

点击(此处)折叠或打开

  1. def poll(self):
  2.         return self._internal_poll()



点击(此处)折叠或打开

  1. def _internal_poll(self, _deadstate=None, _waitpid=os.waitpid,
  2.                 _WNOHANG=os.WNOHANG, _os_error=os.error, _ECHILD=errno.ECHILD):
  3.             """Check if child process has terminated. Returns returncode
  4.             attribute.
  5.             This method is called by __del__, so it cannot reference anything
  6.             outside of the local scope (nor can any methods it calls).
  7.             """
  8.             if self.returncode is None:
  9.                 try:
  10.                     pid, sts = _waitpid(self.pid, _WNOHANG)
  11.                     if pid == self.pid:
  12.                         self._handle_exitstatus(sts)
  13.                 except _os_error as e:
  14.                     if _deadstate is not None:
  15.                         self.returncode = _deadstate
  16.                     if e.errno == _ECHILD:
  17.                         # This happens if SIGCLD is set to be ignored or
  18.                         # waiting for child processes has otherwise been
  19.                         # disabled for our process. This child is dead, we
  20.                         # can not get the status.
  21.                         # http://bugs.python.org/issue15756
  22.                         self.returncode = 0
  23.             return self.returncode



再看看wait的代码

点击(此处)折叠或打开

  1. def wait(self):
  2.             """Wait for child process to terminate. Returns returncode
  3.             attribute."""
  4.             while self.returncode is None:
  5.                 try:
  6.                     pid, sts = _eintr_retry_call(os.waitpid, self.pid, 0)
  7.                 except OSError as e:
  8.                     if e.errno != errno.ECHILD:
  9.                         raise
  10.                     # This happens if SIGCLD is set to be ignored or waiting
  11.                     # for child processes has otherwise been disabled for our
  12.                     # process. This child is dead, we can not get the status.
  13.                     pid = self.pid
  14.                     sts = 0
  15.                 # Check the pid and loop as waitpid has been known to return
  16.                 # 0 even without WNOHANG in odd situations. issue14396.
  17.                 if pid == self.pid:
  18.                     self._handle_exitstatus(sts)
  19.             return self.returncode


看到这里就明白了,poll和wait最终调用的是os.waitpid,但是poll是非阻塞的wait是阻塞的.....
我们看看python的文档

点击(此处)折叠或打开

  1. os.waitpid(pid, options)
  2. The details of this function differ on Unix and Windows.
  3. On Unix: Wait for completion of a child process given by process id pid, and return a tuple containing its process id and exit status indication (encoded as for wait()). The semantics of the call are affected by the value of the integer options, which should be 0 for normal operation.
  4. os.WNOHANG
  5. The option for waitpid() to return immediately if no child process status is available immediately. The function returns (0, 0) in this case.

所以,发送kill信号后,pool()后就不需要wait了










































































以上是关于通过阅读python subprocess源码尝试实现非阻塞读取stdout以及非阻塞wait的主要内容,如果未能解决你的问题,请参考以下文章

杀死使用 Python 的 subprocess.Popen() 创建的进程 [重复]

python 通过 subprocess 执行命令,重定向实时输出

通过 python 子进程 SSH 后终端挂起

通过 subprocess.Popen 在 python 中执行 R 脚本

通过subprocess模块,在python解释器中实现cmd中的命令结果

python的subprocess模块