Twisted框架学习

Posted silence-cho

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Twisted框架学习相关的知识,希望对你有一定的参考价值。

  Twisted是用Python实现的基于事件驱动的网络引擎框架,是python中一个强大的异步IO库。理解twisted的一个前提是弄清楚twisted中几个核心的概念: reactor, Protocl, ProtocolFactory, Deffered

 1 reactor

twisted.internet.reactor

https://twistedmatrix.com/documents/current/core/howto/reactor-basics.html

reactor是twisted异步框架中的核心组件,是一个基于select,poll或epoll的事件循环,其监听socket的状态,当socket状态有变化时(有新的连接请求,或接受到数据等)时,调用相应的组件来进行处理。如下图的reactor loop一样,不断的循环扫描socket列表中监听对象的状态,当有特定事件发生时(socket状态变化)调用回调函数callback,来处理事件,这时候执行权限交给回调函数,当我们的代码处理完事件后,将执行权返回给reactor,继续进行循环监听。

技术分享图片

2,Factory和Protocol

 

twisted.internet.protocol.Factory

 

twisted.internet.protocol.Protocol

参考文章:

 

https://twistedmatrix.com/documents/current/core/howto/servers.html

https://twistedmatrix.com/documents/current/core/howto/clients.html

https://zhuanlan.zhihu.com/p/28763807?utm_source=wechat_session&utm_medium=social

  Factory和Protocol都是用来处理一些配置和协议相关的底层业务,如socket之间连接,数据的发送格式等。Factory设置持久的,多个socket可共享的通用配置,Protocol为设置单个socket的特定配置。当有一个socket连接请求时,Factory创建一个Protocol实例,并将该实例的factory属性指向自己,请求断开时,protocol即被销毁。如下第一幅图中,基于twisted的一个异步服务器,其中endpiont为绑定的ip和端口,reactor监听其socket的状态,当有连接请求时,reactor调用Factory来设置相关配置,其随后会创建(buildProtocol)Protocol实例,Protocol实例的Transport属性会处理客户端socket的请求,并执行相应的回调函数。在第二幅图中可以看到,reactor共监听四个socket,一个是服务端listening socket(其绑定的ip和port),和三个客户端socket,而每个客户端socket都有自己的Protocol来处理相应的数据交互请求,这些Protocol都由Factory创建。(也可以有多个Factory,每个Factory创建多个Protocol)

技术分享图片

技术分享图片

Factory: 主要用来创建protocol,也可以定义其他操作

twisted.internet.protocol.Factory

  Factory类的源码如下,其有属性protocol和方法buildProtocol()较为重要,其中protocol指向需要创建的Protocol类,从buildProtocol()方法可以看到其创建了Protocol实例,并且将该实例的factory属性指向了Factory 实例。startFactory()和stopFactory()相当于钩子函数,在factory和端口连接和断开时调用。在实际应用时,一般选择继承Factory的子类,并实现相应的方法,如ClientFactory,SeverFactory。

技术分享图片
@implementer(interfaces.IProtocolFactory, interfaces.ILoggingContext)
class Factory:
    """
    This is a factory which produces protocols.

    By default, buildProtocol will create a protocol of the class given in
    self.protocol.
    """

    # put a subclass of Protocol here:
    protocol = None

    numPorts = 0
    noisy = True

    @classmethod
    def forProtocol(cls, protocol, *args, **kwargs):
        """
        Create a factory for the given protocol.

        It sets the C{protocol} attribute and returns the constructed factory
        instance.

        @param protocol: A L{Protocol} subclass

        @param args: Positional arguments for the factory.

        @param kwargs: Keyword arguments for the factory.

        @return: A L{Factory} instance wired up to C{protocol}.
        """
        factory = cls(*args, **kwargs)
        factory.protocol = protocol
        return factory


    def logPrefix(self):
        """
        Describe this factory for log messages.
        """
        return self.__class__.__name__


    def doStart(self):
        """Make sure startFactory is called.

        Users should not call this function themselves!
        """
        if not self.numPorts:
            if self.noisy:
                _loggerFor(self).info("Starting factory {factory!r}",
                                      factory=self)
            self.startFactory()
        self.numPorts = self.numPorts + 1

    def doStop(self):
        """Make sure stopFactory is called.

        Users should not call this function themselves!
        """
        if self.numPorts == 0:
            # this shouldn‘t happen, but does sometimes and this is better
            # than blowing up in assert as we did previously.
            return
        self.numPorts = self.numPorts - 1
        if not self.numPorts:
            if self.noisy:
                _loggerFor(self).info("Stopping factory {factory!r}",
                                      factory=self)
            self.stopFactory()

    def startFactory(self):
        """This will be called before I begin listening on a Port or Connector.

        It will only be called once, even if the factory is connected
        to multiple ports.

        This can be used to perform ‘unserialization‘ tasks that
        are best put off until things are actually running, such
        as connecting to a database, opening files, etcetera.
        """

    def stopFactory(self):
        """This will be called before I stop listening on all Ports/Connectors.

        This can be overridden to perform ‘shutdown‘ tasks such as disconnecting
        database connections, closing files, etc.

        It will be called, for example, before an application shuts down,
        if it was connected to a port. User code should not call this function
        directly.
        """


    def buildProtocol(self, addr):
        """
        Create an instance of a subclass of Protocol.

        The returned instance will handle input on an incoming server
        connection, and an attribute "factory" pointing to the creating
        factory.

        Alternatively, L{None} may be returned to immediately close the
        new connection.

        Override this method to alter how Protocol instances get created.

        @param addr: an object implementing L{twisted.internet.interfaces.IAddress}
        """
        p = self.protocol()
        p.factory = self
        return p
Factory

  继承Factory类,创建protocol实例,有两种方式,一是设置其属性protocol,不覆盖buildProtocol()方法;二是不设置属性protocol,覆盖buildProtocol()方法,在方法内部创建protocol实例,并返回。代码如下:

技术分享图片
from twisted.internet.protocol import Factory, Protocol
from twisted.internet.endpoints import TCP4ServerEndpoint
from twisted.internet import reactor

class QOTD(Protocol):

    def connectionMade(self):
        # self.factory was set by the factory‘s default buildProtocol:
        self.transport.write(self.factory.quote + 
)
        self.transport.loseConnection()


class QOTDFactory(Factory):

    # This will be used by the default buildProtocol to create new protocols:
    protocol = QOTD

    def __init__(self, quote=None):
        self.quote = quote or An apple a day keeps the doctor away

endpoint = TCP4ServerEndpoint(reactor, 8007)
endpoint.listen(QOTDFactory("configurable quote"))
reactor.run()
设置protocol属性
技术分享图片
from twisted.internet.protocol import Factory
from twisted.internet.endpoints import TCP4ServerEndpoint
from twisted.internet import reactor

class QOTD(Protocol):

    def connectionMade(self):
        # self.factory was set by the factory‘s default buildProtocol:
        self.transport.write(self.factory.quote + 
)
        self.transport.loseConnection()

class QOTDFactory(Factory):
    def buildProtocol(self, addr):
        return QOTD()

# 8007 is the port you want to run under. Choose something >1024
endpoint = TCP4ServerEndpoint(reactor, 8007)
endpoint.listen(QOTDFactory())
reactor.run()
实现buildProtocol方法

startFactory()和stopFactory()示例

技术分享图片
from twisted.internet.protocol import Factory
from twisted.protocols.basic import LineReceiver

#LineReceiver为一种protocol
class LoggingProtocol(LineReceiver):

    def lineReceived(self, line):
        self.factory.fp.write(line + 
)


class LogfileFactory(Factory):

    protocol = LoggingProtocol

    def __init__(self, fileName):
        self.file = fileName

    def startFactory(self):
        self.fp = open(self.file, a)

    def stopFactory(self):
        self.fp.close()
View Code

 

Protocol:主要用来处理连接建立和断开时的操作,以及数据的接受和发送操作

twisted.internet.protocol.Protocol

Protocol继承了BaseProtocol, BaseProtocol和Protocol的源码如下:

技术分享图片
class BaseProtocol:
    """
    This is the abstract superclass of all protocols.

    Some methods have helpful default implementations here so that they can
    easily be shared, but otherwise the direct subclasses of this class are more
    interesting, L{Protocol} and L{ProcessProtocol}.
    """
    connected = 0
    transport = None

    def makeConnection(self, transport):
        """Make a connection to a transport and a server.

        This sets the ‘transport‘ attribute of this Protocol, and calls the
        connectionMade() callback.
        """
        self.connected = 1
        self.transport = transport
        self.connectionMade()

    def connectionMade(self):
        """Called when a connection is made.

        This may be considered the initializer of the protocol, because
        it is called when the connection is completed.  For clients,
        this is called once the connection to the server has been
        established; for servers, this is called after an accept() call
        stops blocking and a socket has been received.  If you need to
        send any greeting or initial message, do it here.
        """
BaseProtocol
技术分享图片
@implementer(interfaces.IProtocol, interfaces.ILoggingContext)
class Protocol(BaseProtocol):
    """
    This is the base class for streaming connection-oriented protocols.

    If you are going to write a new connection-oriented protocol for Twisted,
    start here.  Any protocol implementation, either client or server, should
    be a subclass of this class.

    The API is quite simple.  Implement L{dataReceived} to handle both
    event-based and synchronous input; output can be sent through the
    ‘transport‘ attribute, which is to be an instance that implements
    L{twisted.internet.interfaces.ITransport}.  Override C{connectionLost} to be
    notified when the connection ends.

    Some subclasses exist already to help you write common types of protocols:
    see the L{twisted.protocols.basic} module for a few of them.
    """

    def logPrefix(self):
        """
        Return a prefix matching the class name, to identify log messages
        related to this protocol instance.
        """
        return self.__class__.__name__


    def dataReceived(self, data):
        """Called whenever data is received.

        Use this method to translate to a higher-level message.  Usually, some
        callback will be made upon the receipt of each complete protocol
        message.

        @param data: a string of indeterminate length.  Please keep in mind
            that you will probably need to buffer some data, as partial
            (or multiple) protocol messages may be received!  I recommend
            that unit tests for protocols call through to this method with
            differing chunk sizes, down to one byte at a time.
        """

    def connectionLost(self, reason=connectionDone):
        """Called when the connection is shut down.

        Clear any circular references here, and any external references
        to this Protocol.  The connection has been closed.

        @type reason: L{twisted.python.failure.Failure}
        """
Protocol

  BaseProtocol有两个属性(connected和transport),从其makeConnection()中可以看到,每创建一个连接,conencted值加一,为transport赋值,并调用钩子函数connectionMade().

  Protocol有两个钩子函数dataReceived()和connectionLost(), 分别在接受到客户端数据和断开连接时调用,自定义相应的代码来处理数据和断开连接时的清理工作。

 

twisted.protocols.basic

twisted.words.protocols

  除了继承Protocol来定义操作外,还可以继承其他的协议,如twisted.protocols.basic中的LineReceiver, LineReceiver等,twisted.words.protocols.irc中的IRCClient等

可以参见:https://twistedmatrix.com/documents/18.9.0/api/twisted.protocols.basic.html

https://twistedmatrix.com/documents/current/api/twisted.words.protocols.irc.html

继承LineReciver

技术分享图片
from twisted.protocols.basic import LineReceiver

class Answer(LineReceiver):

    answers = {How are you?: Fine, None: "I don‘t know what you mean"}

    def lineReceived(self, line):
        if line in self.answers:
            self.sendLine(self.answers[line])
        else:
            self.sendLine(self.answers[None])
LineReciver

基于twisted的服务端

技术分享图片
from twisted.internet.protocol import Factory
from twisted.protocols.basic import LineReceiver
from twisted.internet import reactor

class Chat(LineReceiver):

    def __init__(self, users):
        self.users = users
        self.name = None
        self.state = "GETNAME"

    def connectionMade(self):
        self.sendLine("What‘s your name?")

    def connectionLost(self, reason):
        if self.name in self.users:
            del self.users[self.name]

    def lineReceived(self, line):
        if self.state == "GETNAME":
            self.handle_GETNAME(line)
        else:
            self.handle_CHAT(line)

    def handle_GETNAME(self, name):
        if name in self.users:
            self.sendLine("Name taken, please choose another.")
            return
        self.sendLine("Welcome, %s!" % (name,))
        self.name = name
        self.users[name] = self
        self.state = "CHAT"

    def handle_CHAT(self, message):
        message = "<%s> %s" % (self.name, message)
        for name, protocol in self.users.iteritems():
            if protocol != self:
                protocol.sendLine(message)


class ChatFactory(Factory):

    def __init__(self):
        self.users = {} # maps user names to Chat instances

    def buildProtocol(self, addr):
        return Chat(self.users)


reactor.listenTCP(8123, ChatFactory())
reactor.run()
View Code

继承IRCClient

技术分享图片
from twisted.words.protocols import irc
from twisted.internet import protocol

class LogBot(irc.IRCClient):

    def connectionMade(self):
        irc.IRCClient.connectionMade(self)
        self.logger = MessageLogger(open(self.factory.filename, "a"))
        self.logger.log("[connected at %s]" %
                        time.asctime(time.localtime(time.time())))

    def signedOn(self):
        self.join(self.factory.channel)


class LogBotFactory(protocol.ClientFactory):

    def __init__(self, channel, filename):
        self.channel = channel
        self.filename = filename

    def buildProtocol(self, addr):
        p = LogBot()
        p.factory = self
        return p
IRCCClient

基于twisted的客户端

技术分享图片
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.


"""
An example IRC log bot - logs a channel‘s events to a file.

If someone says the bot‘s name in the channel followed by a ‘:‘,
e.g.

    <foo> logbot: hello!

the bot will reply:

    <logbot> foo: I am a log bot

Run this script with two arguments, the channel name the bot should
connect to, and file to log to, e.g.:

    $ python ircLogBot.py test test.log

will log channel #test to the file ‘test.log‘.

To run the script:

    $ python ircLogBot.py <channel> <file>
"""


from __future__ import print_function

# twisted imports
from twisted.words.protocols import irc
from twisted.internet import reactor, protocol
from twisted.python import log

# system imports
import time, sys


class MessageLogger:
    """
    An independent logger class (because separation of application
    and protocol logic is a good thing).
    """
    def __init__(self, file):
        self.file = file

    def log(self, message):
        """Write a message to the file."""
        timestamp = time.strftime("[%H:%M:%S]", time.localtime(time.time()))
        self.file.write(%s %s
 % (timestamp, message))
        self.file.flush()

    def close(self):
        self.file.close()


class LogBot(irc.IRCClient):
    """A logging IRC bot."""
    
    nickname = "twistedbot"
    
    def connectionMade(self):
        irc.IRCClient.connectionMade(self)
        self.logger = MessageLogger(open(self.factory.filename, "a"))
        self.logger.log("[connected at %s]" % 
                        time.asctime(time.localtime(time.time())))

    def connectionLost(self, reason):
        irc.IRCClient.connectionLost(self, reason)
        self.logger.log("[disconnected at %s]" % 
                        time.asctime(time.localtime(time.time())))
        self.logger.close()


    # callbacks for events

    def signedOn(self):
        """Called when bot has successfully signed on to server."""
        self.join(self.factory.channel)

    def joined(self, channel):
        """This will get called when the bot joins the channel."""
        self.logger.log("[I have joined %s]" % channel)

    def privmsg(self, user, channel, msg):
        """This will get called when the bot receives a message."""
        user = user.split(!, 1)[0]
        self.logger.log("<%s> %s" % (user, msg))
        
        # Check to see if they‘re sending me a private message
        if channel == self.nickname:
            msg = "It isn‘t nice to whisper!  Play nice with the group."
            self.msg(user, msg)
            return

        # Otherwise check to see if it is a message directed at me
        if msg.startswith(self.nickname + ":"):
            msg = "%s: I am a log bot" % user
            self.msg(channel, msg)
            self.logger.log("<%s> %s" % (self.nickname, msg))

    def action(self, user, channel, msg):
        """This will get called when the bot sees someone do an action."""
        user = user.split(!, 1)[0]
        self.logger.log("* %s %s" % (user, msg))

    # irc callbacks

    def irc_NICK(self, prefix, params):
        """Called when an IRC user changes their nickname."""
        old_nick = prefix.split(!)[0]
        new_nick = params[0]
        self.logger.log("%s is now known as %s" % (old_nick, new_nick))


    # For fun, override the method that determines how a nickname is changed on
    # collisions. The default method appends an underscore.
    def alterCollidedNick(self, nickname):
        """
        Generate an altered version of a nickname that caused a collision in an
        effort to create an unused related name for subsequent registration.
        """
        return nickname + ^



class LogBotFactory(protocol.ClientFactory):
    """A factory for LogBots.

    A new protocol instance will be created each time we connect to the server.
    """

    def __init__(self, channel, filename):
        self.channel = channel
        self.filename = filename

    def buildProtocol(self, addr):
        p = LogBot()
        p.factory = self
        return p

    def clientConnectionLost(self, connector, reason):
        """If we get disconnected, reconnect to server."""
        connector.connect()

    def clientConnectionFailed(self, connector, reason):
        print("connection failed:", reason)
        reactor.stop()


if __name__ == __main__:
    # initialize logging
    log.startLogging(sys.stdout)
    
    # create factory protocol and application
    f = LogBotFactory(sys.argv[1], sys.argv[2])

    # connect factory to this host and port
    reactor.connectTCP("irc.freenode.net", 6667, f)

    # run bot
    reactor.run()
View Code

 

 3,Deferred 和 DeferredList

参考文章:

https://twistedmatrix.com/documents/current/core/howto/defer.html

https://twistedmatrix.com/documents/current/core/howto/gendefer.html

Deferred

twisted.internet.defer.Deferred

http://krondo.com/a-second-interlude-deferred/

https://twistedmatrix.com/documents/current/api/twisted.internet.defer.Deferred.html

  Deferred用来管理回调函数,可以实现回调函数的链式执行。Deferred需要同时加入一对回调函数,一个calllback,正常执行时调用,一个errback,执行异常时调用。可以按顺序,依次加入多个函数对,执行时根据加入顺序依次链式执行。当只加入一个函数时,Deferred会默认加入一个另一个函数(但这个函数什么事情也不做,相当于pass),简单示例如下:

from twisted.internet import reactor
from twisted.internet.protocol import Protocol,Factory
from twisted.internet.defer import Deferred

def down_page(url):
    print 去下载url网页
def down_failed():
    print 若下载网页失败时执行down_failed()
def save_page():
    print 保存网页
def close_task():
    reactor.stop()
d = Deferred()
d.addCallbacks(down_page,down_failed)  #相当于d.addCallback(down_page)和d.addErrback(down_failed)
d.addCallback(save_page)  # Deferred默认加入d.addErrback(save_failed),但save_failed什么也不做
d.addBoth(close_task)   #相当于d.addCallbacks(close_task,close_task)
reactor.callWhenRunning(d.callback,www.baidu.com)
reactor.run()

  Deferred的执行流如下左图所示:其有两条链,一条callbacks链,一条errbacks链,正常情况下,其会按照callback链依次执行加入的每一个callback函数,但若发生异常时,则会跳过该callback,而执行errback。下面的每一个箭头都代表一条可能的执行路径。如右图中的一条路径,依次执行第一个,第二个callback,但由于第二个callback抛出异常,执行第三对回调函数的errback,该errback函数捕获了异常,继续执行第四对回调函数的callback。(若该errback未能处理异常,而继续传递异常,将执行第四对回调函数的errback)

技术分享图片技术分享图片

  Deferred在执行回调函数时,其内部也会再返回一个Deferred对象,此时外部Deferred会暂停执行,将执行权交给reactor,当reactor执行内部Deferred的回调函数时,内部Deferred最后会调用外部Deferred的回调函数而切换到外部Deferred继续执行。其执行流示意图如下:

技术分享图片Reactor和Deferred间的执行权限切换如下:

技术分享图片

 

 

 

 DeferredList

twisted.internet.defer.DeferredList

参考文章

http://krondo.com/deferreds-en-masse/

https://twistedmatrix.com/documents/current/api/twisted.internet.defer.DeferredList.html

  deferredList用来管理多个deferred对象,监听其状态,得到所有deferred对象的回调函数最后的返回值或异常。deferredList也可以像defer一样添加回调函数,当其监听的所有deferred对象执行完成后,调用deferredList的回调函数。

技术分享图片

 

 4,inclineCallbacks

twisted.internet.defer.inlineCallbacks

参考文章:

http://krondo.com/just-another-way-to-spell-callback/

  inclineCallbacks可以理解为innerdeferred的回调函数

  理解inclineCallbacks,先来看看generator,下面的generator代码的执行结果如下,执行流程中可以看到,每次yield时,就从generator函数内部跳转到函数外部执行,函数外部执行后将结果send到generator内部,或者将异常throw到generator内部,generator内部函数继续执行,此处我们可以想象为generator函数执行到yield时,内部函数挂起,调用外部函数,外部函数执行完成后将结果返回给generator处理,然后内部函数继续执行。

技术分享图片
#coding:utf-8

class Malfunction(Exception):
    pass

def my_generator():
    print starting up

    val = yield 1
    print got:, val

    val = yield 2
    print got:, val

    try:
        yield 3
    except Malfunction:
        print malfunction!

    yield 4

    print done

gen = my_generator()  #gen通过send,throw向my_generator函数内部发送值,赋值给val;my_generator函数内部通过yield返回值给gen

print  gen.next(), from yield  # start the generator
print  gen.send(10), from yield   # send the value 10
print gen.send(20), from yield   # send the value 20
print gen.throw(Malfunction()), from yield  # raise an exception inside the generator

try:
    gen.next()
except StopIteration:
    pass
generator

技术分享图片

  因此我们可以写出如下的代码,实现内部函数调用。

技术分享图片
#coding:utf-8
def func1():
    print 执行回调函数1
    return result1

def func2():
    print 执行回调函数2
    return result2

def my_generator():
    print 内部函数开始执行

    val1 = yield 1

    print 回调函数1执行完成,返回结果:,val1

    val2 = yield 2

    print 回调函数2执行完成,返回结果:,val2

    yield 3

    print 内部函数结束执行
gen = my_generator()

gen.next()
t1 = func1()
gen.send(t1)

t2 = func2()
gen.send(t2)

try:
    gen.next()
except StopIteration as e:
    pass
View Code

技术分享图片

  装饰器@inclineCallbacks必须和yield搭配使用,其作用相当于gen.next, send , throw, 当generator函数内部yield时,其负责拿到外部函数结果并返回给generator。若yield一个单独的值时,inclineCallbacks立即返回该值,继续执行generator函数,若yield 一个deferred对象时,内部函数挂起,等deferred的回调函数执行完毕后,将回调函数的结果或异常返回,generator才继续执行(若时异常时不捕获,yield会抛出该异常)。具体流程如下面代码:

技术分享图片
from twisted.internet.defer import inlineCallbacks, Deferred

@inlineCallbacks
def my_callbacks():
    from twisted.internet import reactor

    print first callback
    result = yield 1 # yielded values that aren‘t deferred come right back

    print second callback got, result
    d = Deferred()
    reactor.callLater(5, d.callback, 2)   #d的回调函数在5秒钟后才执行,yield d 会使generator等待d执行完毕
    result = yield d # yielded deferreds will pause the generator

    print third callback got, result # the result of the deferred

    d = Deferred()
    reactor.callLater(5, d.errback, Exception(3))

    try:
        yield d
    except Exception, e:
        result = e

    print fourth callback got, repr(result) # the exception from the deferred

    reactor.stop()

from twisted.internet import reactor
reactor.callWhenRunning(my_callbacks)
reactor.run()
View Code

技术分享图片

  另外,对于装饰器@inclineCallbacks装饰的generator的返回值也是一个deferred对象:

技术分享图片
from twisted.internet.defer import inlineCallbacks, Deferred
from twisted.internet import reactor

@inlineCallbacks
def my_generator():
    yield 1

    d = Deferred()
    reactor.callLater(5, d.callback, 2)
    yield d

    yield 2
d = my_generator()
print d, type(d)

reactor.run()
View Code

技术分享图片

5,框架使用

https://twistedmatrix.com/documents/current/core/howto/application.html

 

参考:

学习教程:http://krondo.com/an-introduction-to-asynchronous-programming-and-twisted/

官方文档:https://twistedmatrix.com/trac/wiki/Documentation

 


以上是关于Twisted框架学习的主要内容,如果未能解决你的问题,请参考以下文章

爬虫日记(106):Twisted:单元测试怎么样编写

爬虫日记(93):Twisted的设计模型

Python爬虫编程思想(27):Twisted框架基础

爬虫日记(79):Twisted的延时机制

twisted高并发库transport函数处理数据包的些许问题

Twisted源码分析1