函数调用超时

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了函数调用超时相关的知识,希望对你有一定的参考价值。

我正在调用Python中的一个函数,我知道它可能会停止并迫使我重新启动脚本。

我如何调用该函数或我将其包装成什么,以便如果它花费的时间超过5秒,脚本会取消它并执行其他操作?

答案

如果在UNIX上运行,可以使用signal包:

In [1]: import signal

# Register an handler for the timeout
In [2]: def handler(signum, frame):
   ...:     print "Forever is over!"
   ...:     raise Exception("end of time")
   ...: 

# This function *may* run for an indetermined time...
In [3]: def loop_forever():
   ...:     import time
   ...:     while 1:
   ...:         print "sec"
   ...:         time.sleep(1)
   ...:         
   ...:         

# Register the signal function handler
In [4]: signal.signal(signal.SIGALRM, handler)
Out[4]: 0

# Define a timeout for your function
In [5]: signal.alarm(10)
Out[5]: 0

In [6]: try:
   ...:     loop_forever()
   ...: except Exception, exc: 
   ...:     print exc
   ....: 
sec
sec
sec
sec
sec
sec
sec
sec
Forever is over!
end of time

# Cancel the timer if the function returned before timeout
# (ok, mine won't but yours maybe will :)
In [7]: signal.alarm(0)
Out[7]: 0

调用alarm.alarm(10) 10秒后,调用处理程序。这引发了一个例外,您可以从常规Python代码中截取。

这个模块不适合线程(但那么,谁呢?)

请注意,由于我们在超时发生时引发异常,因此最终可能会在函数内捕获并忽略,例如一个这样的函数:

def loop_forever():
    while 1:
        print 'sec'
        try:
            time.sleep(10)
        except:
            continue
另一答案

我们可以使用相同的信号。我认为以下示例对您有用。与线程相比,它非常简单。

import signal

def timeout(signum, frame):
    raise myException

#this is an infinite loop, never ending under normal circumstances
def main():
    print 'Starting Main ',
    while 1:
        print 'in main ',

#SIGALRM is only usable on a unix platform
signal.signal(signal.SIGALRM, timeout)

#change 5 to however many seconds you need
signal.alarm(5)

try:
    main()
except myException:
    print "whoops"
另一答案

我需要一个不会被time.sleep(基于线程的方法无法做到)阻塞的可嵌套定时中断(SIGALARM无法做到)。我最终复制并从这里轻轻修改代码:http://code.activestate.com/recipes/577600-queue-for-managing-multiple-sigalrm-alarms-concurr/

代码本身:

#!/usr/bin/python

# lightly modified version of http://code.activestate.com/recipes/577600-queue-for-managing-multiple-sigalrm-alarms-concurr/


"""alarm.py: Permits multiple SIGALRM events to be queued.

Uses a `heapq` to store the objects to be called when an alarm signal is
raised, so that the next alarm is always at the top of the heap.
"""

import heapq
import signal
from time import time

__version__ = '$Revision: 2539 $'.split()[1]

alarmlist = []

__new_alarm = lambda t, f, a, k: (t + time(), f, a, k)
__next_alarm = lambda: int(round(alarmlist[0][0] - time())) if alarmlist else None
__set_alarm = lambda: signal.alarm(max(__next_alarm(), 1))


class TimeoutError(Exception):
    def __init__(self, message, id_=None):
        self.message = message
        self.id_ = id_


class Timeout:
    ''' id_ allows for nested timeouts. '''
    def __init__(self, id_=None, seconds=1, error_message='Timeout'):
        self.seconds = seconds
        self.error_message = error_message
        self.id_ = id_
    def handle_timeout(self):
        raise TimeoutError(self.error_message, self.id_)
    def __enter__(self):
        self.this_alarm = alarm(self.seconds, self.handle_timeout)
    def __exit__(self, type, value, traceback):
        try:
            cancel(self.this_alarm) 
        except ValueError:
            pass


def __clear_alarm():
    """Clear an existing alarm.

    If the alarm signal was set to a callable other than our own, queue the
    previous alarm settings.
    """
    oldsec = signal.alarm(0)
    oldfunc = signal.signal(signal.SIGALRM, __alarm_handler)
    if oldsec > 0 and oldfunc != __alarm_handler:
        heapq.heappush(alarmlist, (__new_alarm(oldsec, oldfunc, [], {})))


def __alarm_handler(*zargs):
    """Handle an alarm by calling any due heap entries and resetting the alarm.

    Note that multiple heap entries might get called, especially if calling an
    entry takes a lot of time.
    """
    try:
        nextt = __next_alarm()
        while nextt is not None and nextt <= 0:
            (tm, func, args, keys) = heapq.heappop(alarmlist)
            func(*args, **keys)
            nextt = __next_alarm()
    finally:
        if alarmlist: __set_alarm()


def alarm(sec, func, *args, **keys):
    """Set an alarm.

    When the alarm is raised in `sec` seconds, the handler will call `func`,
    passing `args` and `keys`. Return the heap entry (which is just a big
    tuple), so that it can be cancelled by calling `cancel()`.
    """
    __clear_alarm()
    try:
        newalarm = __new_alarm(sec, func, args, keys)
        heapq.heappush(alarmlist, newalarm)
        return newalarm
    finally:
        __set_alarm()


def cancel(alarm):
    """Cancel an alarm by passing the heap entry returned by `alarm()`.

    It is an error to try to cancel an alarm which has already occurred.
    """
    __clear_alarm()
    try:
        alarmlist.remove(alarm)
        heapq.heapify(alarmlist)
    finally:
        if alarmlist: __set_alarm()

和一个用法示例:

import alarm
from time import sleep

try:
    with alarm.Timeout(id_='a', seconds=5):
        try:
            with alarm.Timeout(id_='b', seconds=2):
                sleep(3)
        except alarm.TimeoutError as e:
            print 'raised', e.id_
        sleep(30)
except alarm.TimeoutError as e:
    print 'raised', e.id_
else:
    print 'nope.'
另一答案

timeout-decorator不能在Windows系统上工作,因为Windows不支持signal

如果您在Windows系统中使用timeout-decorator,您将获得以下内容

AttributeError: module 'signal' has no attribute 'SIGALRM'

有人建议使用use_signals=False,但没有为我工作。

作者@bitranox创建了以下包:

pip install https://github.com/bitranox/wrapt-timeout-decorator/archive/master.zip

代码示例:

import time
from wrapt_timeout_decorator import *

@timeout(5)
def mytest(message):
    print(message)
    for i in range(1,10):
        time.sleep(1)
        print('{} seconds have passed'.format(i))

def main():
    mytest('starting')


if __name__ == '__main__':
    main()

给出以下例外:

TimeoutError: Function mytest timed out after 5 seconds
另一答案

我是wrapt_timeout_decorator的作者

这里介绍的大多数解决方案在Linux下乍一看都很糟糕 - 因为我们有fork()和signals() - 但是在Windows上,事情看起来有点不同。当谈到Linux上的subthreads时,你不能再使用Signals了。

为了在Windows下生成一个进程,它需要是可选择的 - 许多修饰函数或类方法都不是。

所以你需要使用更好的选择器,如莳萝和多进程(不是pickle和多处理) - 这就是为什么你不能使用ProcessPoolExecutor(或只有有限的功能)。

对于超时本身 - 您需要定义超时意味着什么 - 因为在Windows上,生成进程需要相当多(并且不可确定)的时间。在短暂的超时时间这可能很棘手。让我们假设,产生这个过程大约需要0.5秒(很容易!!!)。如果你给出超时0.2秒会发生什么?该功能应该在0.5 + 0.2秒后超时(所以让方法运行0.2秒)?或者,如果被调用的进程在0.2秒后超时(在这种情况下,装饰的函数将总是超时,因为在那个时间它甚至没有产生)?

嵌套的装饰器也可能很讨厌,你不能在子线程中使用信号。如果您想创建一个真正通用的跨平台装饰器,所有这些都需要考虑(并经过测试)。

其他问题是将异常传递回调用者,以及日志记录问题(如果在修饰函数中使用 - 不支持记录到另一个进程中的文件)

我试图涵盖所有边缘情况,您可以查看包裹wrapt_timeout_decorator,或者至少测试您自己的解决方案,灵感来自那里使用的单元测试。

@Alexis Eggermont - 遗憾的是我没有足够的评论意见 - 也许其他人可以通知你 - 我想我解决了你的导入问题。

另一答案

这是对给定的基于线程的解决方案的略微改进。

以下代码支持例外:

def runFunctionCatchExceptions(func, *args, **kwargs):
    try:
        result = func(*args, **kwargs)
    except Exception, message:
        return ["exception", message]

    return ["RESULT", result]


def runFunctionWithTimeout(func, args=(), kwargs={}, timeout_duration=10, default=None):
    import threading
    class InterruptableThread(threading.Thread):
        def __init__(self):
            threading.Thread.__init__(self)
            self.result = default
        def run(s

以上是关于函数调用超时的主要内容,如果未能解决你的问题,请参考以下文章

php 一个自定义的try..catch包装器代码片段,用于执行模型函数,使其成为一个单行函数调用

在mocha测试中调用异步函数如何避免超时错误:超过2000ms的超时

如何将数据从回收器适配器发送到片段 |如何从 recyclerview 适配器调用片段函数

如何测量代码片段的调用次数和经过时间

如何从片段 KOTLIN 中调用意图 [重复]

JavaScript超时调用间歇调用