使用输出和超时启动python进程

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了使用输出和超时启动python进程相关的知识,希望对你有一定的参考价值。

我正在尝试找到启动新进程的方法,并获得其输出,如果它需要不到X秒。如果进程需要更多时间,我想忽略进程结果,终止进程并继续进行。

我需要基本上将计时器添加到下面的代码中。现在确定是否有更好的方法,我愿意接受一个不同的更好的解决方案。

from multiprocessing import Process, Queue

def f(q):
    # Ugly work
    q.put(['hello', 'world'])

if __name__ == '__main__':
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    print q.get()
    p.join()

谢谢!

答案

您可能会发现以下模块对您的案例很有用:

#! /usr/bin/env python3
"""Allow functions to be wrapped in a timeout API.

Since code can take a long time to run and may need to terminate before
finishing, this module provides a set_timeout decorator to wrap functions."""

__author__ = 'Stephen "Zero" Chappell ' 
             '<stephen.paul.chappell@atlantis-zero.net>'
__date__ = '18 December 2017'
__version__ = 1, 0, 1
__all__ = [
    'set_timeout',
    'run_with_timeout'
]

import multiprocessing
import sys
import time

DEFAULT_TIMEOUT = 60


def set_timeout(limit=None):
    """Return a wrapper that provides a timeout API for callers."""
    if limit is None:
        limit = DEFAULT_TIMEOUT
    _Timeout.validate_limit(limit)

    def wrapper(entry_point):
        return _Timeout(entry_point, limit)

    return wrapper


def run_with_timeout(limit, polling_interval, entry_point, *args, **kwargs):
    """Execute a callable object and automatically poll for results."""
    engine = set_timeout(limit)(entry_point)
    engine(*args, **kwargs)
    while engine.ready is False:
        time.sleep(polling_interval)
    return engine.value


def _target(queue, entry_point, *args, **kwargs):
    """Help with multiprocessing calls by being a top-level module function."""
    # noinspection PyPep8,PyBroadException
    try:
        queue.put((True, entry_point(*args, **kwargs)))
    except:
        queue.put((False, sys.exc_info()[1]))


class _Timeout:
    """_Timeout(entry_point, limit) -> _Timeout instance"""

    def __init__(self, entry_point, limit):
        """Initialize the _Timeout instance will all needed attributes."""
        self.__entry_point = entry_point
        self.__limit = limit
        self.__queue = multiprocessing.Queue()
        self.__process = multiprocessing.Process()
        self.__timeout = time.monotonic()

    def __call__(self, *args, **kwargs):
        """Begin execution of the entry point in a separate process."""
        self.cancel()
        self.__queue = multiprocessing.Queue(1)
        self.__process = multiprocessing.Process(
            target=_target,
            args=(self.__queue, self.__entry_point) + args,
            kwargs=kwargs
        )
        self.__process.daemon = True
        self.__process.start()
        self.__timeout = time.monotonic() + self.__limit

    def cancel(self):
        """Terminate execution if possible."""
        if self.__process.is_alive():
            self.__process.terminate()

    @property
    def ready(self):
        """Property letting callers know if a returned value is available."""
        if self.__queue.full():
            return True
        elif not self.__queue.empty():
            return True
        elif self.__timeout < time.monotonic():
            self.cancel()
        else:
            return False

    @property
    def value(self):
        """Property that retrieves a returned value if available."""
        if self.ready is True:
            valid, value = self.__queue.get()
            if valid:
                return value
            raise value
        raise TimeoutError('execution timed out before terminating')

    @property
    def limit(self):
        """Property controlling what the timeout period is in seconds."""
        return self.__limit

    @limit.setter
    def limit(self, value):
        self.validate_limit(value)
        self.__limit = value

    @staticmethod
    def validate_limit(value):
        """Verify that the limit's value is not too low."""
        if value <= 0:
            raise ValueError('limit must be greater than zero')

要使用,请参阅以下演示其用法的示例:

from time import sleep


def main():
    timeout_after_four_seconds = timeout(4)
    # create copies of a function that have a timeout
    a = timeout_after_four_seconds(do_something)
    b = timeout_after_four_seconds(do_something)
    c = timeout_after_four_seconds(do_something)
    # execute the functions in separate processes
    a('Hello', 1)
    b('World', 5)
    c('Jacob', 3)
    # poll the functions to find out what they returned
    results = [a, b, c]
    polling = set(results)
    while polling:
        for process, name in zip(results, 'abc'):
            if process in polling:
                ready = process.ready
                if ready is True:       # if the function returned
                    print(name, 'returned', process.value)
                    polling.remove(process)
                elif ready is None:     # if the function took too long
                    print(name, 'reached timeout')
                    polling.remove(process)
                else:                   # if the function is running
                    assert ready is False, 'ready must be True, False, or None'
        sleep(0.1)
    print('Done.')


def do_something(data, work):
    sleep(work)
    print(data)
    return work


if __name__ == '__main__':
    main()
另一答案

您运行的进程是否涉及循环?如果是这样,你可以在开始循环之前得到时间戳,并在循环中包含一个带有sys.exit()的if语句;如果当前时间戳与记录的开始时间戳相差超过x秒,则命令终止脚本。

另一答案

所有你需要使the queue example from the docs适应你的情况是将超时传递给q.get()调用并在超时时终止进程:

from Queue import Empty
...

try:
    print q.get(timeout=timeout)
except Empty: # no value, timeout occured
    p.terminate()
    q = None # the queue might be corrupted after the `terminate()` call
p.join()

使用Pipe可能更轻量级,否则代码是相同的(您可以使用.poll(timeout),以确定是否有数据要接收)。

以上是关于使用输出和超时启动python进程的主要内容,如果未能解决你的问题,请参考以下文章

Java进程获取输出并设置超时

在 python 上获取“错误 R10(启动超时)-> Web 进程未能在启动后 60 秒内绑定到 $PORT”

在 Python 多处理进程中运行较慢的 OpenCV 代码片段

python:运行一个超时的进程并捕获stdout、stderr和退出状态[重复]

无法启动进程启动失败:等待应用启动超时

C Windows:生成子进程以停止和重新启动(超时后)父进程