python在永无止境的进程上运行覆盖

Posted

技术标签:

【中文标题】python在永无止境的进程上运行覆盖【英文标题】:python running coverage on never ending process 【发布时间】:2017-01-21 23:33:56 【问题描述】:

我有一个多处理 Web 服务器,其进程永无止境,我想在实时环境中检查整个项目的代码覆盖率(不仅来自测试)。

问题是,由于进程永远不会结束,我没有设置cov.start() cov.stop() cov.save() 挂钩的好地方。

因此,我考虑生成一个线程,该线程将在无限循环中保存并组合覆盖数据,然后休眠一段时间,但是这种方法不起作用,覆盖报告似乎是空的,除了睡眠线.

我很乐意收到有关如何获得我的代码覆盖率的任何想法, 或关于为什么我的想法行不通的任何建议。这是我的代码的 sn-p:

import coverage
cov = coverage.Coverage()
import time
import threading
import os

class CoverageThread(threading.Thread):
    _kill_now = False
    _sleep_time = 2

@classmethod
def exit_gracefully(cls):
    cls._kill_now = True

def sleep_some_time(self):
    time.sleep(CoverageThread._sleep_time)

def run(self):
    while True:
        cov.start()
        self.sleep_some_time()
        cov.stop()
        if os.path.exists('.coverage'):
            cov.combine()
        cov.save()
        if self._kill_now:
            break
    cov.stop()
    if os.path.exists('.coverage'):
        cov.combine()
    cov.save()
    cov.html_report(directory="coverage_report_data.html")
    print "End of the program. I was killed gracefully :)"

【问题讨论】:

您究竟想在这里测量什么? “覆盖”特指测试。您是否想查看您的代码的哪些位实际被执行? 您要检查您的 .py 文件是否还存在吗? 是的,代码库非常庞大,我们想看看哪些代码区域永远无法到达,如果这些区域存在,请将它们删除,或者检查它们无法到达的原因。 @Rizzit 我知道这些文件是有效的,但我实际上想知道究竟执行了什么。 Coverage 使用解释器挂钩来通知正在执行的代码中的每一行。这会显着降低性能,您愿意为此付出代价吗? 【参考方案1】:

您可以直接使用pyrasite,配合以下两个程序。

# start.py
import sys
import coverage

sys.cov = cov = coverage.coverage()
cov.start()

还有这个

# stop.py
import sys

sys.cov.stop()
sys.cov.save()
sys.cov.html_report()

另一种方法是使用lptrace 跟踪程序,即使它只打印调用它也很有用。

【讨论】:

【参考方案2】:

显然,不可能通过多个Threads很好地控制coverage。 一旦启动了不同的线程,停止Coverage 对象将停止所有覆盖,start 只会在“开始”线程中重新启动它。 因此,您的代码基本上会在 2 秒后停止对除 CoverageThread 之外的所有 Thread 的覆盖。

我对 API 进行了一些尝试,可以在不停止 Coverage 对象的情况下访问测量值。 因此,您可以使用 API 启动一个定期保存覆盖数据的线程。 第一个实现是这样的

import threading
from time import sleep
from coverage import Coverage
from coverage.data import CoverageData, CoverageDataFiles
from coverage.files import abs_file

cov = Coverage(config_file=True)
cov.start()


def get_data_dict(d):
    """Return a dict like d, but with keys modified by `abs_file` and
    remove the copied elements from d.
    """
    res = 
    keys = list(d.keys())
    for k in keys:
        a = 
        lines = list(d[k].keys())
        for l in lines:
            v = d[k].pop(l)
            a[l] = v
        res[abs_file(k)] = a
    return res


class CoverageLoggerThread(threading.Thread):
    _kill_now = False
    _delay = 2

    def __init__(self, main=True):
        self.main = main
        self._data = CoverageData()
        self._fname = cov.config.data_file
        self._suffix = None
        self._data_files = CoverageDataFiles(basename=self._fname,
                                             warn=cov._warn)
        self._pid = os.getpid()
        super(CoverageLoggerThread, self).__init__()

    def shutdown(self):
        self._kill_now = True

    def combine(self):
        aliases = None
        if cov.config.paths:
            from coverage.aliases import PathAliases
            aliases = PathAliases()
            for paths in self.config.paths.values():
                result = paths[0]
                for pattern in paths[1:]:
                    aliases.add(pattern, result)

        self._data_files.combine_parallel_data(self._data, aliases=aliases)

    def export(self, new=True):
        cov_report = cov
        if new:
            cov_report = Coverage(config_file=True)
            cov_report.load()
        self.combine()
        self._data_files.write(self._data)
        cov_report.data.update(self._data)
        cov_report.html_report(directory="coverage_report_data.html")
        cov_report.report(show_missing=True)

    def _collect_and_export(self):
        new_data = get_data_dict(cov.collector.data)
        if cov.collector.branch:
            self._data.add_arcs(new_data)
        else:
            self._data.add_lines(new_data)
        self._data.add_file_tracers(get_data_dict(cov.collector.file_tracers))
        self._data_files.write(self._data, self._suffix)

        if self.main:
            self.export()

    def run(self):
        while True:
            sleep(CoverageLoggerThread._delay)
            if self._kill_now:
                break

            self._collect_and_export()

        cov.stop()

        if not self.main:
            self._collect_and_export()
            return

        self.export(new=False)
        print("End of the program. I was killed gracefully :)")

可以在GIST 中找到更稳定的版本。 这段代码基本上是在不停止收集器的情况下获取收集器收集的信息。 get_data_dict 函数获取Coverage.collector 中的字典并弹出可用数据。这应该足够安全,以免丢失任何测量值。 报告文件每_delay 秒更新一次。

但是如果你有多个进程在运行,你需要付出额外的努力来确保所有的进程都运行CoverageLoggerThread。这是patch_multiprocessing 函数,从coverage 猴子补丁修补的猴子... 代码在GIST 中。它基本上用自定义进程替换了原始进程,该进程在运行run 方法之前启动CoverageLoggerThread,并在进程结束时加入线程。 脚本main.py 允许使用线程和进程启动不同的测试。

这段代码有 2/3 的缺点需要你小心:

同时使用combine 函数是个坏主意,因为它对.coverage.* 文件执行并发读/写/删除访问。这意味着函数export 不是超级安全的。应该没问题,因为数据被多次复制,但我会在生产中使用它之前做一些测试。

数据一旦导出,就会保留在内存中。因此,如果代码库很大,它可能会占用一些资源。可以转储所有数据并重新加载它,但我假设如果你想每 2 秒记录一次,你不想每次都重新加载所有数据。如果您延迟几分钟,我将每次创建一个新的_data,使用CoverageData.read_file 重新加载此过程的先前覆盖状态。

自定义进程将在完成之前等待_delay,因为我们在进程结束时加入CoverageThreadLogger,因此如果您有很多快速进程,您希望将睡眠的粒度增加到能够更快地检测到进程的结束。它只需要一个在_kill_now 上中断的自定义睡眠循环。

让我知道这是否对您有所帮助,或者是否可以改进此要点。


编辑: 看来您不需要对多处理模块进行修补即可自动启动记录器。在你的 python 安装中使用.pth,你可以使用环境变量在新进程上自动启动你的记录器:

# Content of coverage.pth in your site-package folder
import os
if "COVERAGE_LOGGER_START" in os.environ:
    import atexit
    from coverage_logger import CoverageLoggerThread
    thread_cov = CoverageLoggerThread(main=False)
    thread_cov.start()
    def close_cov()
        thread_cov.shutdown()
        thread_cov.join()
    atexit.register(close_cov)

然后您可以使用COVERAGE_LOGGER_START=1 python main.y 启动您的覆盖记录器

【讨论】:

感谢您提出这个建议。它在coverage.py 代码库中使用了许多非公共接口。 coverage.py 可以提供哪些公共 API 以使其更受支持? 另外,为什么要调用 combine?为什么不将每个快照写入一个单独的文件,然后再将它们组合起来? Coverage 可以提出一个导出函数,将当前数据保存在一个文件中,并且不会停止 Coverage 对象。如果您拍摄collector 数据的快照(基本上是我在这里实现的),这样做非常简单。我在这段代码中调用 combine 是因为 OP 正在这样做。如果您跨越多个进程,它会使其更加复杂,但要保持文件数量较少。此外,代码倾向于重写文件,因此尽早使用combine 避免丢失信息。这取决于您在一段时间内跨越的流程数量及其管理。 另外值得注意的是,Coverage 的公共 API 中的所有方法都依赖于停止收集器的 get_data 方法。因此,您必须使用非公共方法来获得不会破坏数据收集的类似导出的行为。 嗨@ThomasMoreau 非常感谢您的详细回答。我会尝试在我们的代码上实现类似的东西。【参考方案3】:

既然您愿意为测试以不同的方式运行代码,为什么不添加一种方法来结束测试过程呢?这似乎比试图破解覆盖范围更简单。

【讨论】:

因为数据应该以无缝的方式收集到最终用户,所以我们的想法是从公司周围的多个用户那里收集数据并结合他们的结果,同时允许他们以常规方式工作。此外,如果我将强制进程重新启动或停止,那将影响会损害结果的行为。

以上是关于python在永无止境的进程上运行覆盖的主要内容,如果未能解决你的问题,请参考以下文章

C++ PyImport 的 Python 覆盖率

Python 覆盖不会将输出重定向到文件

通过 pytest 使用多进程处理时如何测量覆盖率?

使用 cython 在图上执行框覆盖

管道似乎无法在 exec() 覆盖的 fork() 进程之间进行通信

Python 子进程循环运行两次