python 设置一个蟒蛇进程的超时,可以通过父子线程或者进程的方式实现

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python 设置一个蟒蛇进程的超时,可以通过父子线程或者进程的方式实现相关的知识,希望对你有一定的参考价值。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

__all__ = ['timeout']

import ctypes
import functools
import threading


def _async_raise(tid, exception):
    ret = ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(tid), ctypes.py_object(exception))
    if ret == 0:
        raise ValueError('Invalid thread id')
    elif ret != 1:
        ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(tid), 0)
        raise SystemError('PyThreadState_SetAsyncExc failed')


class ThreadTimeout(Exception):
    """ Thread Timeout Exception """
    pass


class WorkThread(threading.Thread):
    """ WorkThread """

    def __init__(self, target, args, kwargs):
        threading.Thread.__init__(self)
        self.setDaemon(False)
        self.target = target
        self.args = args
        self.kwargs = kwargs
        self.start()

    def run(self):
        try:
            self.result = self.target(*self.args, **self.kwargs)
        except Exception as e:
            self.exception = e
        else:
            self.exception = None

    def get_tid(self):
        if self.ident is None:
            raise threading.ThreadError('The thread is not active')
        return self.ident

    def raise_exc(self, exception):
        _async_raise(self.get_tid(), exception)

    def terminate(self):
        self.raise_exc(SystemExit)


def timeout(timeout):
    """ timeout decorator """
    def proxy(method):
        @functools.wraps(method)
        def func(*args, **kwargs):
            worker = WorkThread(method, args, kwargs)
            worker.join(timeout=timeout)
            if worker.is_alive():
                worker.terminate()
                raise ThreadTimeout('A call to %s() has timed out' % method.__name__)
            elif worker.exception is not None:
                raise worker.exception
            else:
                return worker.result
        return func
    return proxy


if __name__ == '__main__':
    @timeout(2)
    def http_test():
        import urllib
        req = urllib.urlopen('http://eternallybored.org/misc/slow/')
        data = req.read()
        return data


    try:
        data = http_test()
    except ThreadTimeout as e:
        print e
    else:
        print data

以上是关于python 设置一个蟒蛇进程的超时,可以通过父子线程或者进程的方式实现的主要内容,如果未能解决你的问题,请参考以下文章

Python并行操作 - 父子进程 subprocess库

python subprocess模块 监控子进程的2种方式 忙等待和立即返回同时设置子进程超时

Python子进程超时?

python:运行一个超时的进程并捕获stdout、stderr和退出状态[重复]

Python 超时(运行时间太长) 结束进程

python 蟒蛇日志设置