如何从Twisted的deferToThread API中延迟添加延迟?
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何从Twisted的deferToThread API中延迟添加延迟?相关的知识,希望对你有一定的参考价值。
from twisted.internet import reactor from twisted.internet import threads from twisted.internet import defer import time def worker(arg): print 'Hello world' time.sleep(10) return 1 def run(): print 'Starting workers' l = [] for x in range(2): l.append(threads.deferToThread(worker, x)) return defer.DeferredList(l) def res(results): print results reactor.stop() d = run() d.addCallback(res) reactor.run()
如何通过超时停止工人?
除非与您合作,否则不能中断线程。 time.sleep(10)
不会合作,所以我认为你不能打断这个工人。如果您有另一种具有多个离散阶段的工作者,或者在某些任务的循环中操作,那么您可以执行以下操作:
def worker(stop, jobs):
for j in jobs:
if stop:
break
j.do()
stop = []
d = deferToThread(worker)
# This will make the list eval to true and break out of the loop.
stop.append(None)
这也不是Twisted特定的。这就是线程在Python中的工作方式。
虽然可能无法中断线程,但可以通过cancel
函数停止延迟,我认为这在Twisted 10.1.0及更高版本中可用。
我已经使用以下类来使Deferreds回调特定函数,如果Deferred在一段时间后没有触发。对于与OP主题具有相同问题的人来说,这可能是有用的。
编辑:正如下面的评论所建议的,最好不要继承defer.Deferred
。因此,我已经更改了代码以使用实现相同效果的包装器。
class DeferredWrapperWithTimeout(object):
'''
Holds a deferred that allows a specified function to be called-back
if the deferred does not fire before some specified timeout.
'''
def __init__(self, canceller=None):
self._def = defer.Deferred(canceller)
def _finish(self, r, t):
'''
Function to be called (internally) after the Deferred
has fired, in order to cancel the timeout.
'''
if ( (t!=None) and (t.active()) ):
t.cancel()
return r
def getDeferred(self):
return self._def
def addTimeoutCallback(self, reactr, timeout,
callUponTimeout, *args, **kw):
'''
The function 'callUponTimeout' (with optional args or keywords)
will be called after 'timeout' seconds, unless the Deferred fires.
'''
def timeoutCallback():
self._def.cancel()
callUponTimeout(*args, **kw)
toc = reactr.callLater(timeout, timeoutCallback)
return self._def.addCallback(self._finish, toc)
超时前的示例回调:
from twisted.internet import reactor
from DeferredWithTimeout import *
dw = DeferredWrapperWithTimeout()
d = dw.getDeferred()
def testCallback(x=None):
print "called"
def testTimeout(x=None):
print "timedout"
d.addCallback(testCallback)
dw.addTimeoutCallback(reactor, 20, testTimeout, "to")
reactor.callLater(2, d.callback, "cb")
reactor.run()
打印“被叫”,没有别的。
回调前的示例超时:
from twisted.internet import reactor
from DeferredWithTimeout import *
dw = DeferredWrapperWithTimeout()
d = dw.getDeferred()
def testCallback(x=None):
print "called"
def testTimeout(x=None):
print "timedout"
d.addCallback(testCallback)
dw.addTimeoutCallback(reactor, 20, testTimeout, "to")
reactor.run()
20秒后打印“timedout”,没有别的。
好吧,我的答案不是关于线程,但正如所说,你可以实现超时功能作为一个单独的帮助:
from twisted.internet import defer
def add_watchdog(deferred, timeout=0.05):
def callback(value):
if not watchdog.called:
watchdog.cancel()
return value
deferred.addBoth(callback)
from twisted.internet import reactor
watchdog = reactor.callLater(timeout, defer.timeout, deferred)
d = defer.Deferred()
add_watchdog(d)
然后,如果需要,你可以在延迟的错误中捕获defer.TimeoutError
。
我们这样做是使用装饰器。此方法的优点是在达到超时时取消延迟。这应该以某种方式成为Twisted库imho的一部分
from twisted.internet import defer, reactor
def timeout(secs):
"""Decorator to add timeout to Deferred calls"""
def wrap(func):
@defer.inlineCallbacks
def _timeout(*args, **kwargs):
raw_d = func(*args, **kwargs)
if not isinstance(raw_d, defer.Deferred):
defer.returnValue(raw_d)
timeout_d = defer.Deferred()
times_up = reactor.callLater(secs, timeout_d.callback, None)
try:
raw_result, timeout_result = yield defer.DeferredList(
[raw_d, timeout_d], fireOnOneCallback=True, fireOnOneErrback=True,
consumeErrors=True)
except defer.FirstError as e: # Only raw_d should raise an exception
assert e.index == 0
times_up.cancel()
e.subFailure.raiseException()
else: # timeout
if timeout_d.called:
raw_d.cancel()
raise Exception("%s secs have expired" % secs)
# no timeout
times_up.cancel()
defer.returnValue(raw_result)
return _timeout
return wrap
以上是关于如何从Twisted的deferToThread API中延迟添加延迟?的主要内容,如果未能解决你的问题,请参考以下文章
Twisted:使用 pyglet-twisted 时如何从 EndPoint 调用 Deferred
如何完全从原始 XML 创建 twisted.words.xish.domish.Element