Twisted源码分析4--Deferred

Posted 幻觉czw

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Twisted源码分析4--Deferred相关的知识,希望对你有一定的参考价值。

我们知道,twisted处处依赖异步编程,而回调是twisted异步编程的基础。由于回调在twisted编程中的广泛运用,twisted的开发者设计了一种抽象机制Deferred让程序员使用回调是更加简便。基础教程中关于Deferred的部分请参见小插曲,Deferred,需要进一步阅读的请参见Deferred的文档。而现在,我们就来分析Deferred的实现

# /twisted/internet/defer.py
class Deferred:
    called = False
    paused = 0
    _suppressAlreadyCalled = False  
    _runningCallbacks = False

    _chainedTo = None

    def __init__(self, canceller=None):
        self.callbacks = []
        self._canceller = canceller
        if self.debug:
            self._debugInfo = DebugInfo()
            self._debugInfo.creator = traceback.format_stack()[:-1]

called属性是一个初始化false为信号量,一旦callback和errback调用之后就置true。pause是一个计数器,当pause大于0的话callback或者是errback的调用会被暂停。_suppressAlreadyCalled是一个很重要的标志,用于deferred的取消机制,当deferred对象没有canceller并且已经被取消了(cancel方法被调用之后),该标志置为True,否则为False。
_runningCallbacks是一个标志当deferred对象的回调链正处于执行状态时为True,这个标志用于阻止_runCallbacks对象的递归调用。如果一个deferred对象需要等待另一个deferred对象的值的话,_chainedTo属性将指向 那个deferred对象。
构造函数十分简单,参数是一个canceller对象,该对象在取消异步操作时会用到。callbacks是回调函数的列表。

# /twisted/internet/defer.py

def passthru(arg):
    return arg

def addCallbacks(self, callback, errback=None,
                     callbackArgs=None, callbackKeywords=None,
                     errbackArgs=None, errbackKeywords=None):
    assert callable(callback) 
    #断言callback是一个可调用对象
    assert errback is None or callable(errback)
    # errback可以为不存在
    cbs = ((callback, callbackArgs, callbackKeywords),
           (errback or (passthru), errbackArgs, errbackKeywords))
    # 如果errback不存在就用passthru函数代替
    self.callbacks.append(cbs)

    # 当添加回调函数时deferred已经被激活,
    # 那么立即执行回调链,执行新添加的回调函数
    if self.called:
        self._runCallbacks()
    return self

def addCallback(self, callback, *args, **kw):
        return self.addCallbacks(callback, callbackArgs=args,
                                 callbackKeywords=kw)


def addErrback(self, errback, *args, **kw):
    return self.addCallbacks(passthru, errback,
                                 errbackArgs=args,
                                 errbackKeywords=kw)


def addBoth(self, callback, *args, **kw):
    return self.addCallbacks(callback, callback,
                                 callbackArgs=args, errbackArgs=args,
                                 callbackKeywords=kw, errbackKeywords=kw)

这部分实现了Deferred中添加回调函数的方法,主要就是addCallbacks方法,注意该方法的实现,同时添加的callback和errback是同一个层级的,callback中抛出的异常errback是无法处理的。正如twisted文档中举出的例子一样:

# Case 1
d = getDeferredFromSomewhere()
d.addCallback(callback1)       # A
d.addErrback(errback1)         # B
d.addCallback(callback2)
d.addErrback(errback2)

# Case 2
d = getDeferredFromSomewhere()
d.addCallbacks(callback1, errback1)  # C
d.addCallbacks(callback2, errback2)

如果一个异常在callback1中触发,那么在Case1中errback1将会被调用,在Case2中,errback2将会被调用。这里需要小心,passthru仅仅将结果或者错误传递下去,当callback或者errback不存在是用于替代。

 def callback(self, result):
     assert not isinstance(result, Deferred)
     self._startRunCallbacks(result)

 def errback(self, fail=None):
     if fail is None:
            fail = failure.Failure(captureVars=self.debug)
        elif not isinstance(fail, failure.Failure):
            fail = failure.Failure(fail)

     self._startRunCallbacks(fail)

def _startRunCallbacks(self, result):
    if self.called:
    # 回调链已经被启动
        if self._suppressAlreadyCalled:
            # cancel已经被调用,此时激活deferred将直接退出
            self._suppressAlreadyCalled = False
            return
        if self.debug:
            if self._debugInfo is None:
                self._debugInfo = DebugInfo()
            extra = "\\n" + self._debugInfo._getDebugTracebacks()
            raise AlreadyCalledError(extra)
        raise AlreadyCalledError
        # 抛出异常,表明回调已经启动或者取消
    if self.debug:
        if self._debugInfo is None:
            self._debugInfo = DebugInfo()
        self._debugInfo.invoker = traceback.format_stack()[:-2]
    self.called = True
    self.result = result
    self._runCallbacks() #运行回调

def _runCallbacks(self):
    if self._runningCallbacks:
            # 防止递归调用回调链
            return

    chain = [self]
    # 维持一个待处理的deferred栈
    while chain:
        current = chain[-1]
        # 取出当前待处理的deferred
        if current.paused:
            # 如果Deferred不打算产生一个结果,
            # 所有deferred链上的deferred都必须等待
            return

        finished = True
        current._chainedTo = None
        while current.callbacks:
        # 依次回调当前deferred的所有回调函数
            item = current.callbacks.pop(0)
            # 弹出到处理的回调函数对
            callback, args, kw = item[
                    isinstance(current.result, failure.Failure)]
            # 根据结果是否为Failure判断调用callback还是errback
            args = args or ()
            kw = kw or 

            # Avoid recursion if we can.
            if callback is _CONTINUE:
                # 当前deferred的回调已经执行完毕,被暂停的
                # 外层deferred剩下的回调将会被执行。此时,
                # 获得外层的deferred将里层的运行结果交给
                # 外层,将外层deferred标为继续执行并重新
                # 添加到deferred链中
                chainee = args[0] # 获得外层deferred
                chainee.result = current.result 
                # 将内层结果交给外层
                current.result = None
                # Making sure to update _debugInfo
                if current._debugInfo is not None:
                    current._debugInfo.failResult = None
                chainee.paused -= 1
                # 继续执行外部deferred
                chain.append(chainee)
                finished = False
                break
            try:
                current._runningCallbacks = True
                try:
                    current.result = callback(current.result, *args, **kw)
                    # 执行回调
                    if current.result is current:
                        warnAboutFunction(
                            callback,
                            "Callback returned the Deferred "
                            "it was attached to; this breaks the "
                            "callback chain and will raise an "
                            "exception in the future.")
                finally:
                    current._runningCallbacks = False
            except:
                # 如果在回调用抛出异常,那么捕捉到这些异常并封装为Failure对象
                current.result = failure.Failure(captureVars=self.debug)
            else:
                if isinstance(current.result, Deferred):
                # 如果返回的结果是一个deferred
                    resultResult = getattr(current.result, 'result', _NO_RESULT)
                    # 检查这个内层deferred的结果有没有产生
                    if resultResult is _NO_RESULT or isinstance(resultResult, Deferred) or current.result.paused:
                    # 如果没有,那么就暂停外层deferred
                        current.pause()
                        current._chainedTo = current.result
                        current.result.callbacks.append(current._continuation())
                        #并将外层deferred的剩余回调依附在内层deferred上
                        break
                    else:
                        current.result.result = None
                        if current.result._debugInfo is not None:
                            current.result._debugInfo.failResult = None
                        current.result = resultResult
                        # 直接获取内层deferred的结果,
                        # 外层继续运行

            # 当前这个deferred的回调都执行结束
            if finished:
                if isinstance(current.result, failure.Failure):
                # 如果最终结果是个错误
                    current.result.cleanFailure()
                    if current._debugInfo is None:
                        current._debugInfo = DebugInfo()
                    current._debugInfo.failResult = current.result
                else:
                    # Clear out any Failure in the _debugInfo, since the result
                    # is no longer a Failure.
                    if current._debugInfo is not None:
                        current._debugInfo.failResult = None
                chain.pop()
                # 这个deferred执行结束,从deferred链中弹出

def _continuation(self):
        return ((_CONTINUE, (self,), None),
                (_CONTINUE, (self,), None))

_runCallbacks实现了回调链的执行过程,由于可能出现多重deferred嵌套的情况,_runCallbacks中使用了一个列表chain管理多重deferred。当一个deferred的回调中返回了一个另一个deferred,并且这个子deferred对象还没有获得执行结果,那么就会将父对象暂停,并且将自己作为回调函数对(callback,errback)的参数,通过把这个回调函数对添加到子deferred的回调函数列表中,使自己剩下的回调函数能在子deferred被激活后获得回调。而已经执完的deferred将会从chain中弹出。

取消动作(cancel)是后面加入的功能,用于异步动作的取消。当cancel方法被调用之后,如果deferred还未被激活,cancel将会激活deferred的errback,产生一个CancelledError异常。当deferred被激活之后,cancel不会有任何动作,cancel部分代码如下:

def cancel(self):
    if not self.called:
    # 如果deferred未被激活
        canceller = self._canceller
        # 获得canceller对象
        if canceller:
            canceller(self)
        else:
        # 表明操作未被取消,也没有激活deferred,
        # 如果其他地方有激活deferred的操作,则
        # 不执行操作
            self._suppressAlreadyCalled = True
        if not self.called:                            
            self.errback(failure.Failure(CancelledError()))
            # 如果在canceller调用之后仍然没有激活deferred,
            # (canceller函数中是可以有激活deferred的操作的)
            # 那么调用errback返回一个CancellError的异常
    elif isinstance(self.result, Deferred):
         self.result.cancel()

cancel方法的最后两行需要注意,如果这个deferred的回调函数中有返回一个deferred,即存在嵌套的deferred,那么就要在取消子deferred,因为可能存在父deferred已经被激活但子deferred未被激活的情况。

通过对以上deferred核心方法的分析,我们对deferred的实现有了一定的了解,同时也能感受到作者这样设计的需求和道理。同时,对deferred的分析能使我们知道如何正确的使用deferred,避免异常扩散到reactor的底层代码中。

以上是关于Twisted源码分析4--Deferred的主要内容,如果未能解决你的问题,请参考以下文章

Twisted源码分析3

Twisted源码分析1

Twisted源码分析1

Twisted源码分析3

Twisted源码分析3

Twisted源码分析2