Twisted是用Python实现的基于事件驱动的网络引擎框架,是python中一个强大的异步IO库。理解twisted的一个前提是弄清楚twisted中几个核心的概念: reactor, Protocl, ProtocolFactory, Deffered
1 reactor
reactor是twisted异步框架中的核心组件,是一个基于select,poll或epoll的事件循环,其监听socket的状态,当socket状态有变化时(有新的连接请求,或接受到数据等)时,调用相应的组件来进行处理。如下图的reactor loop一样,不断的循环扫描socket列表中监听对象的状态,当有特定事件发生时(socket状态变化)调用回调函数callback,来处理事件,这时候执行权限交给回调函数,当我们的代码处理完事件后,将执行权返回给reactor,继续进行循环监听。
Factory和Protocol都是用来处理一些配置和协议相关的底层业务,如socket之间连接,数据的发送格式等。Factory设置持久的,多个socket可共享的通用配置,Protocol为设置单个socket的特定配置。当有一个socket连接请求时,Factory创建一个Protocol实例,并将该实例的factory属性指向自己,请求断开时,protocol即被销毁。如下第一幅图中,基于twisted的一个异步服务器,其中endpiont为绑定的ip和端口,reactor监听其socket的状态,当有连接请求时,reactor调用Factory来设置相关配置,其随后会创建(buildProtocol)Protocol实例,Protocol实例的Transport属性会处理客户端socket的请求,并执行相应的回调函数。在第二幅图中可以看到,reactor共监听四个socket,一个是服务端listening socket(其绑定的ip和port),和三个客户端socket,而每个客户端socket都有自己的Protocol来处理相应的数据交互请求,这些Protocol都由Factory创建。(也可以有多个Factory,每个Factory创建多个Protocol)
Factory: 主要用来创建protocol,也可以定义其他操作
Factory类的源码如下,其有属性protocol和方法buildProtocol()较为重要,其中protocol指向需要创建的Protocol类,从buildProtocol()方法可以看到其创建了Protocol实例,并且将该实例的factory属性指向了Factory 实例。startFactory()和stopFactory()相当于钩子函数,在factory和端口连接和断开时调用。在实际应用时,一般选择继承Factory的子类,并实现相应的方法,如ClientFactory,SeverFactory。

@implementer(interfaces.IProtocolFactory, interfaces.ILoggingContext) class Factory: """ This is a factory which produces protocols. By default, buildProtocol will create a protocol of the class given in self.protocol. """ # put a subclass of Protocol here: protocol = None numPorts = 0 noisy = True @classmethod def forProtocol(cls, protocol, *args, **kwargs): """ Create a factory for the given protocol. It sets the C{protocol} attribute and returns the constructed factory instance. @param protocol: A L{Protocol} subclass @param args: Positional arguments for the factory. @param kwargs: Keyword arguments for the factory. @return: A L{Factory} instance wired up to C{protocol}. """ factory = cls(*args, **kwargs) factory.protocol = protocol return factory def logPrefix(self): """ Describe this factory for log messages. """ return self.__class__.__name__ def doStart(self): """Make sure startFactory is called. Users should not call this function themselves! """ if not self.numPorts: if self.noisy: _loggerFor(self).info("Starting factory {factory!r}", factory=self) self.startFactory() self.numPorts = self.numPorts + 1 def doStop(self): """Make sure stopFactory is called. Users should not call this function themselves! """ if self.numPorts == 0: # this shouldn‘t happen, but does sometimes and this is better # than blowing up in assert as we did previously. return self.numPorts = self.numPorts - 1 if not self.numPorts: if self.noisy: _loggerFor(self).info("Stopping factory {factory!r}", factory=self) self.stopFactory() def startFactory(self): """This will be called before I begin listening on a Port or Connector. It will only be called once, even if the factory is connected to multiple ports. This can be used to perform ‘unserialization‘ tasks that are best put off until things are actually running, such as connecting to a database, opening files, etcetera. """ def stopFactory(self): """This will be called before I stop listening on all Ports/Connectors. This can be overridden to perform ‘shutdown‘ tasks such as disconnecting database connections, closing files, etc. It will be called, for example, before an application shuts down, if it was connected to a port. User code should not call this function directly. """ def buildProtocol(self, addr): """ Create an instance of a subclass of Protocol. The returned instance will handle input on an incoming server connection, and an attribute "factory" pointing to the creating factory. Alternatively, L{None} may be returned to immediately close the new connection. Override this method to alter how Protocol instances get created. @param addr: an object implementing L{twisted.internet.interfaces.IAddress} """ p = self.protocol() p.factory = self return p

from twisted.internet.protocol import Factory, Protocol from twisted.internet.endpoints import TCP4ServerEndpoint from twisted.internet import reactor class QOTD(Protocol): def connectionMade(self): # self.factory was set by the factory‘s default buildProtocol: self.transport.write(self.factory.quote + ‘ ‘) self.transport.loseConnection() class QOTDFactory(Factory): # This will be used by the default buildProtocol to create new protocols: protocol = QOTD def __init__(self, quote=None): self.quote = quote or ‘An apple a day keeps the doctor away‘ endpoint = TCP4ServerEndpoint(reactor, 8007) endpoint.listen(QOTDFactory("configurable quote"))

from twisted.internet.protocol import Factory from twisted.internet.endpoints import TCP4ServerEndpoint from twisted.internet import reactor class QOTD(Protocol): def connectionMade(self): # self.factory was set by the factory‘s default buildProtocol: self.transport.write(self.factory.quote + ‘ ‘) self.transport.loseConnection() class QOTDFactory(Factory): def buildProtocol(self, addr): return QOTD() # 8007 is the port you want to run under. Choose something >1024 endpoint = TCP4ServerEndpoint(reactor, 8007) endpoint.listen(QOTDFactory())

from twisted.internet.protocol import Factory from twisted.protocols.basic import LineReceiver #LineReceiver为一种protocol class LoggingProtocol(LineReceiver): def lineReceived(self, line): self.factory.fp.write(line + ‘ ‘) class LogfileFactory(Factory): protocol = LoggingProtocol def __init__(self, fileName): self.file = fileName def startFactory(self): self.fp = open(self.file, ‘a‘) def stopFactory(self): self.fp.close()
Protocol继承了BaseProtocol, BaseProtocol和Protocol的源码如下:

class BaseProtocol: """ This is the abstract superclass of all protocols. Some methods have helpful default implementations here so that they can easily be shared, but otherwise the direct subclasses of this class are more interesting, L{Protocol} and L{ProcessProtocol}. """ connected = 0 transport = None def makeConnection(self, transport): """Make a connection to a transport and a server. This sets the ‘transport‘ attribute of this Protocol, and calls the connectionMade() callback. """ self.connected = 1 self.transport = transport self.connectionMade() def connectionMade(self): """Called when a connection is made. This may be considered the initializer of the protocol, because it is called when the connection is completed. For clients, this is called once the connection to the server has been established; for servers, this is called after an accept() call stops blocking and a socket has been received. If you need to send any greeting or initial message, do it here. """

@implementer(interfaces.IProtocol, interfaces.ILoggingContext) class Protocol(BaseProtocol): """ This is the base class for streaming connection-oriented protocols. If you are going to write a new connection-oriented protocol for Twisted, start here. Any protocol implementation, either client or server, should be a subclass of this class. The API is quite simple. Implement L{dataReceived} to handle both event-based and synchronous input; output can be sent through the ‘transport‘ attribute, which is to be an instance that implements L{twisted.internet.interfaces.ITransport}. Override C{connectionLost} to be notified when the connection ends. Some subclasses exist already to help you write common types of protocols: see the L{twisted.protocols.basic} module for a few of them. """ def logPrefix(self): """ Return a prefix matching the class name, to identify log messages related to this protocol instance. """ return self.__class__.__name__ def dataReceived(self, data): """Called whenever data is received. Use this method to translate to a higher-level message. Usually, some callback will be made upon the receipt of each complete protocol message. @param data: a string of indeterminate length. Please keep in mind that you will probably need to buffer some data, as partial (or multiple) protocol messages may be received! I recommend that unit tests for protocols call through to this method with differing chunk sizes, down to one byte at a time. """ def connectionLost(self, reason=connectionDone): """Called when the connection is shut down. Clear any circular references here, and any external references to this Protocol. The connection has been closed. @type reason: L{twisted.python.failure.Failure} """
Protocol有两个钩子函数dataReceived()和connectionLost(), 分别在接受到客户端数据和断开连接时调用,自定义相应的代码来处理数据和断开连接时的清理工作。
除了继承Protocol来定义操作外,还可以继承其他的协议,如twisted.protocols.basic中的LineReceiver, LineReceiver等,twisted.words.protocols.irc中的IRCClient等

from twisted.protocols.basic import LineReceiver class Answer(LineReceiver): answers = {‘How are you?‘: ‘Fine‘, None: "I don‘t know what you mean"} def lineReceived(self, line): if line in self.answers: self.sendLine(self.answers[line]) else: self.sendLine(self.answers[None])

from twisted.internet.protocol import Factory from twisted.protocols.basic import LineReceiver from twisted.internet import reactor class Chat(LineReceiver): def __init__(self, users): self.users = users = None self.state = "GETNAME" def connectionMade(self): self.sendLine("What‘s your name?") def connectionLost(self, reason): if in self.users: del self.users[] def lineReceived(self, line): if self.state == "GETNAME": self.handle_GETNAME(line) else: self.handle_CHAT(line) def handle_GETNAME(self, name): if name in self.users: self.sendLine("Name taken, please choose another.") return self.sendLine("Welcome, %s!" % (name,)) = name self.users[name] = self self.state = "CHAT" def handle_CHAT(self, message): message = "<%s> %s" % (, message) for name, protocol in self.users.iteritems(): if protocol != self: protocol.sendLine(message) class ChatFactory(Factory): def __init__(self): self.users = {} # maps user names to Chat instances def buildProtocol(self, addr): return Chat(self.users) reactor.listenTCP(8123, ChatFactory())

from twisted.words.protocols import irc from twisted.internet import protocol class LogBot(irc.IRCClient): def connectionMade(self): irc.IRCClient.connectionMade(self) self.logger = MessageLogger(open(self.factory.filename, "a")) self.logger.log("[connected at %s]" % time.asctime(time.localtime(time.time()))) def signedOn(self): self.join( class LogBotFactory(protocol.ClientFactory): def __init__(self, channel, filename): = channel self.filename = filename def buildProtocol(self, addr): p = LogBot() p.factory = self return p

# Copyright (c) Twisted Matrix Laboratories. # See LICENSE for details. """ An example IRC log bot - logs a channel‘s events to a file. If someone says the bot‘s name in the channel followed by a ‘:‘, e.g. <foo> logbot: hello! the bot will reply: <logbot> foo: I am a log bot Run this script with two arguments, the channel name the bot should connect to, and file to log to, e.g.: $ python test test.log will log channel #test to the file ‘test.log‘. To run the script: $ python <channel> <file> """ from __future__ import print_function # twisted imports from twisted.words.protocols import irc from twisted.internet import reactor, protocol from twisted.python import log # system imports import time, sys class MessageLogger: """ An independent logger class (because separation of application and protocol logic is a good thing). """ def __init__(self, file): self.file = file def log(self, message): """Write a message to the file.""" timestamp = time.strftime("[%H:%M:%S]", time.localtime(time.time())) self.file.write(‘%s %s ‘ % (timestamp, message)) self.file.flush() def close(self): self.file.close() class LogBot(irc.IRCClient): """A logging IRC bot.""" nickname = "twistedbot" def connectionMade(self): irc.IRCClient.connectionMade(self) self.logger = MessageLogger(open(self.factory.filename, "a")) self.logger.log("[connected at %s]" % time.asctime(time.localtime(time.time()))) def connectionLost(self, reason): irc.IRCClient.connectionLost(self, reason) self.logger.log("[disconnected at %s]" % time.asctime(time.localtime(time.time()))) self.logger.close() # callbacks for events def signedOn(self): """Called when bot has successfully signed on to server.""" self.join( def joined(self, channel): """This will get called when the bot joins the channel.""" self.logger.log("[I have joined %s]" % channel) def privmsg(self, user, channel, msg): """This will get called when the bot receives a message.""" user = user.split(‘!‘, 1)[0] self.logger.log("<%s> %s" % (user, msg)) # Check to see if they‘re sending me a private message if channel == self.nickname: msg = "It isn‘t nice to whisper! Play nice with the group." self.msg(user, msg) return # Otherwise check to see if it is a message directed at me if msg.startswith(self.nickname + ":"): msg = "%s: I am a log bot" % user self.msg(channel, msg) self.logger.log("<%s> %s" % (self.nickname, msg)) def action(self, user, channel, msg): """This will get called when the bot sees someone do an action.""" user = user.split(‘!‘, 1)[0] self.logger.log("* %s %s" % (user, msg)) # irc callbacks def irc_NICK(self, prefix, params): """Called when an IRC user changes their nickname.""" old_nick = prefix.split(‘!‘)[0] new_nick = params[0] self.logger.log("%s is now known as %s" % (old_nick, new_nick)) # For fun, override the method that determines how a nickname is changed on # collisions. The default method appends an underscore. def alterCollidedNick(self, nickname): """ Generate an altered version of a nickname that caused a collision in an effort to create an unused related name for subsequent registration. """ return nickname + ‘^‘ class LogBotFactory(protocol.ClientFactory): """A factory for LogBots. A new protocol instance will be created each time we connect to the server. """ def __init__(self, channel, filename): = channel self.filename = filename def buildProtocol(self, addr): p = LogBot() p.factory = self return p def clientConnectionLost(self, connector, reason): """If we get disconnected, reconnect to server.""" connector.connect() def clientConnectionFailed(self, connector, reason): print("connection failed:", reason) reactor.stop() if __name__ == ‘__main__‘: # initialize logging log.startLogging(sys.stdout) # create factory protocol and application f = LogBotFactory(sys.argv[1], sys.argv[2]) # connect factory to this host and port reactor.connectTCP("", 6667, f) # run bot
3,Deferred 和 DeferredList
from twisted.internet import reactor from twisted.internet.protocol import Protocol,Factory from twisted.internet.defer import Deferred def down_page(url): print ‘去下载url网页‘ def down_failed(): print ‘若下载网页失败时执行down_failed()‘ def save_page(): print ‘保存网页‘ def close_task(): reactor.stop() d = Deferred() d.addCallbacks(down_page,down_failed) #相当于d.addCallback(down_page)和d.addErrback(down_failed) d.addCallback(save_page) # Deferred默认加入d.addErrback(save_failed),但save_failed什么也不做 d.addBoth(close_task) #相当于d.addCallbacks(close_task,close_task) reactor.callWhenRunning(d.callback,‘‘)

#coding:utf-8 class Malfunction(Exception): pass def my_generator(): print ‘starting up‘ val = yield 1 print ‘got:‘, val val = yield 2 print ‘got:‘, val try: yield 3 except Malfunction: print ‘malfunction!‘ yield 4 print ‘done‘ gen = my_generator() #gen通过send,throw向my_generator函数内部发送值,赋值给val;my_generator函数内部通过yield返回值给gen print, ‘from yield‘ # start the generator print gen.send(10), ‘from yield‘ # send the value 10 print gen.send(20), ‘from yield‘ # send the value 20 print gen.throw(Malfunction()), ‘from yield‘ # raise an exception inside the generator try: except StopIteration: pass

#coding:utf-8 def func1(): print ‘执行回调函数1‘ return ‘result1‘ def func2(): print ‘执行回调函数2‘ return ‘result2‘ def my_generator(): print ‘内部函数开始执行‘ val1 = yield 1 print ‘回调函数1执行完成,返回结果:‘,val1 val2 = yield 2 print ‘回调函数2执行完成,返回结果:‘,val2 yield 3 print ‘内部函数结束执行‘ gen = my_generator() t1 = func1() gen.send(t1) t2 = func2() gen.send(t2) try: except StopIteration as e: pass
装饰器@inclineCallbacks必须和yield搭配使用,其作用相当于, send , throw, 当generator函数内部yield时,其负责拿到外部函数结果并返回给generator。若yield一个单独的值时,inclineCallbacks立即返回该值,继续执行generator函数,若yield 一个deferred对象时,内部函数挂起,等deferred的回调函数执行完毕后,将回调函数的结果或异常返回,generator才继续执行(若时异常时不捕获,yield会抛出该异常)。具体流程如下面代码:

from twisted.internet.defer import inlineCallbacks, Deferred @inlineCallbacks def my_callbacks(): from twisted.internet import reactor print ‘first callback‘ result = yield 1 # yielded values that aren‘t deferred come right back print ‘second callback got‘, result d = Deferred() reactor.callLater(5, d.callback, 2) #d的回调函数在5秒钟后才执行,yield d 会使generator等待d执行完毕 result = yield d # yielded deferreds will pause the generator print ‘third callback got‘, result # the result of the deferred d = Deferred() reactor.callLater(5, d.errback, Exception(3)) try: yield d except Exception, e: result = e print ‘fourth callback got‘, repr(result) # the exception from the deferred reactor.stop() from twisted.internet import reactor reactor.callWhenRunning(my_callbacks)

from twisted.internet.defer import inlineCallbacks, Deferred from twisted.internet import reactor @inlineCallbacks def my_generator(): yield 1 d = Deferred() reactor.callLater(5, d.callback, 2) yield d yield 2 d = my_generator() print d, type(d)