python 设置一个蟒蛇进程的超时,可以通过父子线程或者进程的方式实现
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python 设置一个蟒蛇进程的超时,可以通过父子线程或者进程的方式实现相关的知识,希望对你有一定的参考价值。
#!/usr/bin/env python
# -*- coding: utf-8 -*-
__all__ = ['timeout']
import ctypes
import functools
import threading
def _async_raise(tid, exception):
ret = ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(tid), ctypes.py_object(exception))
if ret == 0:
raise ValueError('Invalid thread id')
elif ret != 1:
ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(tid), 0)
raise SystemError('PyThreadState_SetAsyncExc failed')
class ThreadTimeout(Exception):
""" Thread Timeout Exception """
pass
class WorkThread(threading.Thread):
""" WorkThread """
def __init__(self, target, args, kwargs):
threading.Thread.__init__(self)
self.setDaemon(False)
self.target = target
self.args = args
self.kwargs = kwargs
self.start()
def run(self):
try:
self.result = self.target(*self.args, **self.kwargs)
except Exception as e:
self.exception = e
else:
self.exception = None
def get_tid(self):
if self.ident is None:
raise threading.ThreadError('The thread is not active')
return self.ident
def raise_exc(self, exception):
_async_raise(self.get_tid(), exception)
def terminate(self):
self.raise_exc(SystemExit)
def timeout(timeout):
""" timeout decorator """
def proxy(method):
@functools.wraps(method)
def func(*args, **kwargs):
worker = WorkThread(method, args, kwargs)
worker.join(timeout=timeout)
if worker.is_alive():
worker.terminate()
raise ThreadTimeout('A call to %s() has timed out' % method.__name__)
elif worker.exception is not None:
raise worker.exception
else:
return worker.result
return func
return proxy
if __name__ == '__main__':
@timeout(2)
def http_test():
import urllib
req = urllib.urlopen('http://eternallybored.org/misc/slow/')
data = req.read()
return data
try:
data = http_test()
except ThreadTimeout as e:
print e
else:
print data
以上是关于python 设置一个蟒蛇进程的超时,可以通过父子线程或者进程的方式实现的主要内容,如果未能解决你的问题,请参考以下文章
Python并行操作 - 父子进程 subprocess库
python subprocess模块 监控子进程的2种方式 忙等待和立即返回同时设置子进程超时
Python子进程超时?
python:运行一个超时的进程并捕获stdout、stderr和退出状态[重复]
Python 超时(运行时间太长) 结束进程
python 蟒蛇日志设置