Twisted源码分析2

Posted 幻觉czw

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Twisted源码分析2相关的知识,希望对你有一定的参考价值。

在上一篇博文中,我们借一个小例子简要讲解了创建事件监听循环,与客户端建立连接。这次,我们将继续按照上一篇开头的例子来探究负责底层传输的transport类

# /twisted/internet/tcp.py

def doRead(self):
    ...
    protocol = self.factory.buildProtocol(self._buildAddr(addr))
    if protocol is None:
        skt.close()
        continue
    s = self.sessionno
    self.sessionno = s+1
    transport = self.transport(skt, protocol, addr, self, s, self.reactor)
    protocol.makeConnection(transport)

当有客服端链接服务器时,reactor会调用负责监听客户端链接的Port对象的doRead方法,在doRead方法里面创建了transport对象,这个transport对象与相应的protocol对象绑定,在上一篇博客开头的例子中,调用了transport的write方法向客户端中写入数据,然后断开与客户端的链接:

class PoetryProtocol(Protocol):

    def connectionMade(self):
        self.transport.write(self.factory.poem)
        self.transport.loseConnection()

transport对象的类对象是Server,我们在上一篇博客中重点关注了相对简单的doRead方法,即从客户端读取数据,而这次我们讲关注write方法。

# /twisted/internet/abstract.py

class FileDescriptor(_ConsumerMixin, _LogOwner):
    ...
    def write(self, data):
        if isinstance(data, unicode): # no, really, I mean it
            # 不支持unicode字符串的直接传输,为啥呢。。
            raise TypeError("Data must not be unicode")
        if not self.connected or self._writeDisconnected:
            #当连接断开或者连接处于写关闭的半打开状态,那么直接返回 
            return
        if data:
            self._tempDataBuffer.append(data)
            # 向临时缓冲去中添加数据
            self._tempDataLen += len(data)
            self._maybePauseProducer()
            # 如果缓冲区数据已经满了的话,通知生产者暂停向其提供数据
            self.startWriting()
            # 将其添加到reactor循环的写监听列表中

write方法是在Server的抽象基类FileDescriptor中实现的,当有数据需要写入的话,该方法将会保证数据被写入到客户端。

# /twisted/internet/abstract.py
def doWrite(self):
    if len(self.dataBuffer) - self.offset < self.SEND_LIMIT:
    # 当缓冲区的数据长度小于最大传输数据长度的限制时,将零时缓冲区中的数据添加到缓冲区中,并且移除已经发送的数据,offset之前的数据   
        self.dataBuffer = _concatenate(
        self.dataBuffer, self.offset, self._tempDataBuffer)
        self.offset = 0
        self._tempDataBuffer = []
        self._tempDataLen = 0

        # 尽可能发送数据
        if self.offset:
            l = self.writeSomeData(lazyByteSlice(self.dataBuffer, self.offset))
        else:
            l = self.writeSomeData(self.dataBuffer)
        # 如果返回的l是个异常,证明传输失败直接返回
        if isinstance(l, Exception) or l < 0:
            return l
        self.offset += l
        # 如果已经没有数据可以发送了
        if self.offset == len(self.dataBuffer) and not self._tempDataLen:
            self.dataBuffer = b""
            self.offset = 0
            # stop writing.
            self.stopWriting()
            # 如果有一个向我们提供数据的producer,通知其为我们提供更多的数据
            if self.producer is not None and ((not self.streamingProducer)
                                              or self.producerPaused):
                self.producerPaused = False
                self.producer.resumeProducing()
            elif self.disconnecting:
                # 如果之前有要求关闭连接,那么在数据全书写入客户端之后断开连接
                return self._postLoseConnection()
            elif self._writeDisconnecting:
                # 如果之前有要求关闭连接的写入,即半关闭。socke.shutdown(1),那么现在也断开连接
                self._writeDisconnected = True
                result = self._closeWriteConnection()
                return result
        return None

def _postLoseConnection(self):
        """通知reactor连接断开"""
        return main.CONNECTION_DONE

def _closeWriteConnection(self):
        try:
            self.socket.shutdown(1)
            # 半关闭连接,后面的任何写操作都将不被允许
        except socket.error:
            pass
        p = interfaces.IHalfCloseableProtocol(self.protocol, None)
        if p:
            try:
                p.writeConnectionLost()
            except:
                f = failure.Failure()
                log.err()
                self.connectionLost(f)

# /twisted/internet/tcp.py 
"""
该方法在基类Connection类中实现
"""
def writeSomeData(self, data):
        """
        尽可能多的写入数据
        当连接断开时,将抛出异常否则成功写入的数据的数量将被返回
        """
        limitedData = lazyByteSlice(data, 0, self.SEND_LIMIT)

        try:
            return untilConcludes(self.socket.send, limitedData)
        except socket.error as se:
            if se.args[0] in (EWOULDBLOCK, ENOBUFS):
                return 0
            else:
                return main.CONNECTION_LOST

# /twisted/python/util.py 
def untilConcludes(f, *a, **kw):
    while True:
        try:
            return f(*a, **kw)
        except (IOError, OSError) as e:
            if e.args[0] == errno.EINTR:
            # 系统调用被打断
                continue
            raise

writeSomeData负责将适当的数据写入,untilConcludes方法确保数据被传送,同时有异常则抛出。当连接关闭时,loseConnectio方法将被调用来处理掉相应的socket以及produce

# /twisted/internet/abstract.py
# FileDescriptor类的方法
def loseConnection(self, _connDone=failure.Failure(main.CONNECTION_DONE)):
        if self.connected and not self.disconnecting:
            if self._writeDisconnected:
                如果已经半关闭连接
                self.stopReading()
                self.stopWriting()
                # 直接断开连接
                self.connectionLost(_connDone)
            else:
                self.stopReading()
                # 添加到写监听列表中,使得在连接关闭之前,将缓冲区中的所有数据都发送出去
                self.startWriting()
                # 将disconnecting置为1,当数据全部发送之后断开连接
                self.disconnecting = 1

# twisted/internet/tcp.py 
# Connection类的方法
def connectionLost(self, reason):
    if not hasattr(self, "socket"):
        return
    abstract.FileDescriptor.connectionLost(self, reason)
    # 调用基类的connectionLost方法,负责将该对象从reactor的读写事件监听列表中移除
    self._closeSocket(not reason.check(error.ConnectionAborted))
    # 负责关闭socket
    protocol = self.protocol
    del self.protocol
    del self.socket
    del self.fileno
    protocol.connectionLost(reason)
    # 回调对应protocol的connectionLost方法,该方法由用户实现,用于做一些必要清理工作

# /twisted/internet/abstract.py
# FileDescriptor类的方法
def connectionLost(self, reason):
    self.disconnected = 1
    self.connected = 0
    # producer对象停止提供数据,移除对producer对象的引用
    if self.producer is not None:
        self.producer.stopProducing()
        self.producer = None
    self.stopReading()
    self.stopWriting()

# /twisted/internet/tcp.py
# _SocketCloser是Connection类的基类
class _SocketCloser(object):

    _shouldShutdown = True

    def _closeSocket(self, orderly):
        skt = self.socket
        try:
            if orderly:
                if self._shouldShutdown:
                    skt.shutdown(2)
                    # 未来读写都将被禁止
            else:
            #SO_LINER设置当socket被关闭时,socket缓冲区缓存中残留的数据的处理,这里设置为1 0,表明将放弃这些数据并且发送一个RST给对方
                 self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER,
                                       struct.pack("ii", 1, 0))

        except socket.error:
            pass
        try:
            skt.close() # 关闭套接字
        except socket.error:
            pass

在调用loseConnection方法之前,将会优先把缓存中的数据全部发送掉然后才会关闭。

transport封装了底层的传输机制,确保了数据能够被方便及时的读取或者写入,transport一般不需要用户的重写,用户只需要重写与transport绑定的protocol,在procotol的相应回调方法中调用transport的相应方法完成数据的传输和读取,以及连接的断开

以上是关于Twisted源码分析2的主要内容,如果未能解决你的问题,请参考以下文章

Twisted源码分析3

Twisted源码分析1

Twisted源码分析1

Twisted源码分析3

Twisted源码分析3

Twisted源码分析4--Deferred