Twisted源码分析4--Deferred
Posted 幻觉czw
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Twisted源码分析4--Deferred相关的知识,希望对你有一定的参考价值。
我们知道,twisted处处依赖异步编程,而回调是twisted异步编程的基础。由于回调在twisted编程中的广泛运用,twisted的开发者设计了一种抽象机制Deferred让程序员使用回调是更加简便。基础教程中关于Deferred的部分请参见小插曲,Deferred,需要进一步阅读的请参见Deferred的文档。而现在,我们就来分析Deferred的实现
# /twisted/internet/defer.py
class Deferred:
called = False
paused = 0
_suppressAlreadyCalled = False
_runningCallbacks = False
_chainedTo = None
def __init__(self, canceller=None):
self.callbacks = []
self._canceller = canceller
if self.debug:
self._debugInfo = DebugInfo()
self._debugInfo.creator = traceback.format_stack()[:-1]
called属性是一个初始化false为信号量,一旦callback和errback调用之后就置true。pause是一个计数器,当pause大于0的话callback或者是errback的调用会被暂停。_suppressAlreadyCalled是一个很重要的标志,用于deferred的取消机制,当deferred对象没有canceller并且已经被取消了(cancel方法被调用之后),该标志置为True,否则为False。
_runningCallbacks是一个标志当deferred对象的回调链正处于执行状态时为True,这个标志用于阻止_runCallbacks对象的递归调用。如果一个deferred对象需要等待另一个deferred对象的值的话,_chainedTo属性将指向 那个deferred对象。
构造函数十分简单,参数是一个canceller对象,该对象在取消异步操作时会用到。callbacks是回调函数的列表。
# /twisted/internet/defer.py
def passthru(arg):
return arg
def addCallbacks(self, callback, errback=None,
callbackArgs=None, callbackKeywords=None,
errbackArgs=None, errbackKeywords=None):
assert callable(callback)
#断言callback是一个可调用对象
assert errback is None or callable(errback)
# errback可以为不存在
cbs = ((callback, callbackArgs, callbackKeywords),
(errback or (passthru), errbackArgs, errbackKeywords))
# 如果errback不存在就用passthru函数代替
self.callbacks.append(cbs)
# 当添加回调函数时deferred已经被激活,
# 那么立即执行回调链,执行新添加的回调函数
if self.called:
self._runCallbacks()
return self
def addCallback(self, callback, *args, **kw):
return self.addCallbacks(callback, callbackArgs=args,
callbackKeywords=kw)
def addErrback(self, errback, *args, **kw):
return self.addCallbacks(passthru, errback,
errbackArgs=args,
errbackKeywords=kw)
def addBoth(self, callback, *args, **kw):
return self.addCallbacks(callback, callback,
callbackArgs=args, errbackArgs=args,
callbackKeywords=kw, errbackKeywords=kw)
这部分实现了Deferred中添加回调函数的方法,主要就是addCallbacks方法,注意该方法的实现,同时添加的callback和errback是同一个层级的,callback中抛出的异常errback是无法处理的。正如twisted文档中举出的例子一样:
# Case 1
d = getDeferredFromSomewhere()
d.addCallback(callback1) # A
d.addErrback(errback1) # B
d.addCallback(callback2)
d.addErrback(errback2)
# Case 2
d = getDeferredFromSomewhere()
d.addCallbacks(callback1, errback1) # C
d.addCallbacks(callback2, errback2)
如果一个异常在callback1中触发,那么在Case1中errback1将会被调用,在Case2中,errback2将会被调用。这里需要小心,passthru仅仅将结果或者错误传递下去,当callback或者errback不存在是用于替代。
def callback(self, result):
assert not isinstance(result, Deferred)
self._startRunCallbacks(result)
def errback(self, fail=None):
if fail is None:
fail = failure.Failure(captureVars=self.debug)
elif not isinstance(fail, failure.Failure):
fail = failure.Failure(fail)
self._startRunCallbacks(fail)
def _startRunCallbacks(self, result):
if self.called:
# 回调链已经被启动
if self._suppressAlreadyCalled:
# cancel已经被调用,此时激活deferred将直接退出
self._suppressAlreadyCalled = False
return
if self.debug:
if self._debugInfo is None:
self._debugInfo = DebugInfo()
extra = "\\n" + self._debugInfo._getDebugTracebacks()
raise AlreadyCalledError(extra)
raise AlreadyCalledError
# 抛出异常,表明回调已经启动或者取消
if self.debug:
if self._debugInfo is None:
self._debugInfo = DebugInfo()
self._debugInfo.invoker = traceback.format_stack()[:-2]
self.called = True
self.result = result
self._runCallbacks() #运行回调
def _runCallbacks(self):
if self._runningCallbacks:
# 防止递归调用回调链
return
chain = [self]
# 维持一个待处理的deferred栈
while chain:
current = chain[-1]
# 取出当前待处理的deferred
if current.paused:
# 如果Deferred不打算产生一个结果,
# 所有deferred链上的deferred都必须等待
return
finished = True
current._chainedTo = None
while current.callbacks:
# 依次回调当前deferred的所有回调函数
item = current.callbacks.pop(0)
# 弹出到处理的回调函数对
callback, args, kw = item[
isinstance(current.result, failure.Failure)]
# 根据结果是否为Failure判断调用callback还是errback
args = args or ()
kw = kw or
# Avoid recursion if we can.
if callback is _CONTINUE:
# 当前deferred的回调已经执行完毕,被暂停的
# 外层deferred剩下的回调将会被执行。此时,
# 获得外层的deferred将里层的运行结果交给
# 外层,将外层deferred标为继续执行并重新
# 添加到deferred链中
chainee = args[0] # 获得外层deferred
chainee.result = current.result
# 将内层结果交给外层
current.result = None
# Making sure to update _debugInfo
if current._debugInfo is not None:
current._debugInfo.failResult = None
chainee.paused -= 1
# 继续执行外部deferred
chain.append(chainee)
finished = False
break
try:
current._runningCallbacks = True
try:
current.result = callback(current.result, *args, **kw)
# 执行回调
if current.result is current:
warnAboutFunction(
callback,
"Callback returned the Deferred "
"it was attached to; this breaks the "
"callback chain and will raise an "
"exception in the future.")
finally:
current._runningCallbacks = False
except:
# 如果在回调用抛出异常,那么捕捉到这些异常并封装为Failure对象
current.result = failure.Failure(captureVars=self.debug)
else:
if isinstance(current.result, Deferred):
# 如果返回的结果是一个deferred
resultResult = getattr(current.result, 'result', _NO_RESULT)
# 检查这个内层deferred的结果有没有产生
if resultResult is _NO_RESULT or isinstance(resultResult, Deferred) or current.result.paused:
# 如果没有,那么就暂停外层deferred
current.pause()
current._chainedTo = current.result
current.result.callbacks.append(current._continuation())
#并将外层deferred的剩余回调依附在内层deferred上
break
else:
current.result.result = None
if current.result._debugInfo is not None:
current.result._debugInfo.failResult = None
current.result = resultResult
# 直接获取内层deferred的结果,
# 外层继续运行
# 当前这个deferred的回调都执行结束
if finished:
if isinstance(current.result, failure.Failure):
# 如果最终结果是个错误
current.result.cleanFailure()
if current._debugInfo is None:
current._debugInfo = DebugInfo()
current._debugInfo.failResult = current.result
else:
# Clear out any Failure in the _debugInfo, since the result
# is no longer a Failure.
if current._debugInfo is not None:
current._debugInfo.failResult = None
chain.pop()
# 这个deferred执行结束,从deferred链中弹出
def _continuation(self):
return ((_CONTINUE, (self,), None),
(_CONTINUE, (self,), None))
_runCallbacks实现了回调链的执行过程,由于可能出现多重deferred嵌套的情况,_runCallbacks中使用了一个列表chain管理多重deferred。当一个deferred的回调中返回了一个另一个deferred,并且这个子deferred对象还没有获得执行结果,那么就会将父对象暂停,并且将自己作为回调函数对(callback,errback)的参数,通过把这个回调函数对添加到子deferred的回调函数列表中,使自己剩下的回调函数能在子deferred被激活后获得回调。而已经执完的deferred将会从chain中弹出。
取消动作(cancel)是后面加入的功能,用于异步动作的取消。当cancel方法被调用之后,如果deferred还未被激活,cancel将会激活deferred的errback,产生一个CancelledError异常。当deferred被激活之后,cancel不会有任何动作,cancel部分代码如下:
def cancel(self):
if not self.called:
# 如果deferred未被激活
canceller = self._canceller
# 获得canceller对象
if canceller:
canceller(self)
else:
# 表明操作未被取消,也没有激活deferred,
# 如果其他地方有激活deferred的操作,则
# 不执行操作
self._suppressAlreadyCalled = True
if not self.called:
self.errback(failure.Failure(CancelledError()))
# 如果在canceller调用之后仍然没有激活deferred,
# (canceller函数中是可以有激活deferred的操作的)
# 那么调用errback返回一个CancellError的异常
elif isinstance(self.result, Deferred):
self.result.cancel()
cancel方法的最后两行需要注意,如果这个deferred的回调函数中有返回一个deferred,即存在嵌套的deferred,那么就要在取消子deferred,因为可能存在父deferred已经被激活但子deferred未被激活的情况。
通过对以上deferred核心方法的分析,我们对deferred的实现有了一定的了解,同时也能感受到作者这样设计的需求和道理。同时,对deferred的分析能使我们知道如何正确的使用deferred,避免异常扩散到reactor的底层代码中。
以上是关于Twisted源码分析4--Deferred的主要内容,如果未能解决你的问题,请参考以下文章