Python-subprocess执行命令并将输出劫持实现实时记录到日志

Posted SnowPhoenix

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Python-subprocess执行命令并将输出劫持实现实时记录到日志相关的知识,希望对你有一定的参考价值。

Python-subprocess执行命令并将输出劫持实现实时记录到日志

前言

在写我自己的练手项目的时候,需要写一系列Python脚本来帮助我进行运维/环境配置,我希望这些脚本能够有比较好的日志记录。

这一篇博客中,我实现了日志同时向控制台和日志中进行输出,并且二者的日志等级、日志格式不相同。

这一篇博客中,我通过自定义日志的格式描述符,实现了定长的日志等级。

解决完日志问题后,我需要解决这些脚本的另一个问题:执行命令。

我写这一系列Python脚本就是因为我不擅长写shell脚本,而且Python脚本在我的开发环境(Windows)和运行环境(Linux)都可以运行。其需要担当的很重要的一个功能就是执行一些命令,如kubectl applymvn test等。

我希望执行这些命令的时候,能够实时地将命令的输出,记录到日志中。

本博客,我实现了执行命令,并将命令执行的输出进行劫持,实时记录到日志中。

参考

subprocess

关于多进程协同,Python主要有三个手段:

  1. os.system()函数;
  2. multiprocessing模块;
  3. subprocess模块;

其中multiprocessing主要用于解决计算密集型计算中,Python由于GIL(全局解释器锁)的存在,多线程无法提升性能,而需要创建多个进程来加快计算的场景。

os.system()则是阻塞式的,而且似乎无法将其创建的子进程的输出进行劫持。

subprocess模块几乎是唯一的选择。

Popen

subprocess虽然提供了简易使用的subprocess.run()方法,但是这个方法无法做到实时输出,也就是命令执行完成之后才能一次性获取命令的输出,即使传入了stdout=subprocess.PIPE要求其创建一个管道,仍旧是阻塞式的读取。stdout也可以指定为一个流,因此我们可以将其直接重定向到本程序的标准输出,但是logger并不是一个流。

所以我们只能使用更为底层的subprocess.Popen对象来进行操作。

process = subprocess.Popen(
    cmd,
    shell=True,
    text=True,
    stdout=subprocess.PIPE,
    stderr=subprocess.PIPE,
    )

通过这样的参数,创建子进程运行cmd指定的命令,同时创建与子进程关联的process对象,此时process.stdoutprocess.stderr都是可以调用readline方法来一行行读取字符串的管道。

实现

通过subprocess.Popen()创建Popen对象后,子进程开始运行,我们可以通过管道来读取子进程的输出。因为有两个管道都需要读取,并且它们没有提供检查管道是否读取的方法,只能阻塞地尝试进行读取,所以我们需要创建读取线程来分别读取这两个管道。

实现如下:

import logging
import subprocess
import shlex
import threading

class CommandExecutionException(Exception):
    def __init__(self, command: str, exit_code: int) -> None:
        super().__init__(f"command executed fail with exit-code={exit_code}: {command}")

_logger = logging.getLogger(__name__)

class TextReadLineThread(threading.Thread):
    def __init__(self, readline, callback, *args, **kargs) -> None:
        super().__init__(*args, **kargs)
        self.readline = readline
        self.callback = callback

    def run(self):
        for line in iter(self.readline, ""):
            if len(line) == 0:
                break
            self.callback(line)

def cmd_exec(command: str, ensure_success: bool=True) -> int:
    _logger.info("executing command: {}".format(command))

    cmd = shlex.split(command)

    process = subprocess.Popen(
        cmd,
        shell=True,
        text=True,
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
        )

    _logger.debug("started command")

    def log_warp(func):
        def _wrapper(line: str):
            return func("\\t" + line.strip())
        return _wrapper

    read_stdout = TextReadLineThread(process.stdout.readline, log_warp(_logger.info))
    read_stderr = TextReadLineThread(process.stderr.readline, log_warp(_logger.warning))
    read_stdout.start()
    read_stderr.start()

    read_stdout.join()
    _logger.debug("stdout reading finish")
    read_stderr.join()
    _logger.debug("stderr reading finish")
    ret = process.wait()
    _logger.debug("process finish")

    _logger.info("executed command with exit-code={}".format(ret))
    if ensure_success and ret != 0:
        raise CommandExecutionException(command=command, exit_code=ret)
    return ret

if __name__ == \'__main__\':
    _logger_trans = {
            "DEBUG": "DBG",
            "INFO": "INF",
            "WARNING": "WAR",
            "CRITICAL": "ERR"
        }
    _old_factory = logging.getLogRecordFactory()
    def factory(name, level, fn, lno, msg, args, exc_info, func=None, sinfo=None, **kwargs)->logging.LogRecord:
        record = _old_factory(name, level, fn, lno, msg, args, exc_info, func, sinfo, **kwargs)
        record.shortlevelname = _logger_trans[record.levelname]
        return record
    logging.setLogRecordFactory(factory)
    logging.basicConfig(
        level=logging.DEBUG,
        format=\'[%(asctime)s %(shortlevelname)s] %(message)s\',
        datefmt="%Y/%m/%d %H:%M:%S"
    )
    cmd_exec("ping localhost", ensure_success=False)

TextReadLineThread的run方法中,我们使用了内置函数iter()的两个参数的形式,其第一个参数为一个可调用对象,第二个参数为该可调用对象可能的输出,通过iter(func, sentinel)函数产生的迭代器,每次迭代时都无参数地调用func,直到func()返回sentinel时结束迭代。

关于main中日志配置,参考这一篇博客和[这一篇博客],对日志格式进行增强,使得我们能够明确看到子进程的输出被劫持了。

这里我创建了两个线程,一个用于读取stdout,一个用于读取stderr,然后阻塞掉当前线程。其实可以只创建一个新的线程用于读取stderr,用当前线程读取stdout

我使用了ping命令来进行测试,因为ping明显是隔一段时间输出一段,可以明显看出是不是实时进行日志记录。但是linux上ping默认是不会停止的,需要注意一下。

以上是关于Python-subprocess执行命令并将输出劫持实现实时记录到日志的主要内容,如果未能解决你的问题,请参考以下文章

如何执行 shell 命令,并将输出分配给 configure.ac 中的 AC_DEFINE 变量

将输出重定向为不同的用户[重复]

获取 os.exec 的输出以显示执行命令的进度

对目录中的所有文件执行命令

从 Linux shell 读取输出并将其存储到 JSON 对象

在hdfs中列出文件并将输出附加到文本文件