Twisted源码分析2
Posted 幻觉czw
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Twisted源码分析2相关的知识,希望对你有一定的参考价值。
在上一篇博文中,我们借一个小例子简要讲解了创建事件监听循环,与客户端建立连接。这次,我们将继续按照上一篇开头的例子来探究负责底层传输的transport类
# /twisted/internet/tcp.py
def doRead(self):
...
protocol = self.factory.buildProtocol(self._buildAddr(addr))
if protocol is None:
skt.close()
continue
s = self.sessionno
self.sessionno = s+1
transport = self.transport(skt, protocol, addr, self, s, self.reactor)
protocol.makeConnection(transport)
当有客服端链接服务器时,reactor会调用负责监听客户端链接的Port对象的doRead方法,在doRead方法里面创建了transport对象,这个transport对象与相应的protocol对象绑定,在上一篇博客开头的例子中,调用了transport的write方法向客户端中写入数据,然后断开与客户端的链接:
class PoetryProtocol(Protocol):
def connectionMade(self):
self.transport.write(self.factory.poem)
self.transport.loseConnection()
transport对象的类对象是Server,我们在上一篇博客中重点关注了相对简单的doRead方法,即从客户端读取数据,而这次我们讲关注write方法。
# /twisted/internet/abstract.py
class FileDescriptor(_ConsumerMixin, _LogOwner):
...
def write(self, data):
if isinstance(data, unicode): # no, really, I mean it
# 不支持unicode字符串的直接传输,为啥呢。。
raise TypeError("Data must not be unicode")
if not self.connected or self._writeDisconnected:
#当连接断开或者连接处于写关闭的半打开状态,那么直接返回
return
if data:
self._tempDataBuffer.append(data)
# 向临时缓冲去中添加数据
self._tempDataLen += len(data)
self._maybePauseProducer()
# 如果缓冲区数据已经满了的话,通知生产者暂停向其提供数据
self.startWriting()
# 将其添加到reactor循环的写监听列表中
write方法是在Server的抽象基类FileDescriptor中实现的,当有数据需要写入的话,该方法将会保证数据被写入到客户端。
# /twisted/internet/abstract.py
def doWrite(self):
if len(self.dataBuffer) - self.offset < self.SEND_LIMIT:
# 当缓冲区的数据长度小于最大传输数据长度的限制时,将零时缓冲区中的数据添加到缓冲区中,并且移除已经发送的数据,offset之前的数据
self.dataBuffer = _concatenate(
self.dataBuffer, self.offset, self._tempDataBuffer)
self.offset = 0
self._tempDataBuffer = []
self._tempDataLen = 0
# 尽可能发送数据
if self.offset:
l = self.writeSomeData(lazyByteSlice(self.dataBuffer, self.offset))
else:
l = self.writeSomeData(self.dataBuffer)
# 如果返回的l是个异常,证明传输失败直接返回
if isinstance(l, Exception) or l < 0:
return l
self.offset += l
# 如果已经没有数据可以发送了
if self.offset == len(self.dataBuffer) and not self._tempDataLen:
self.dataBuffer = b""
self.offset = 0
# stop writing.
self.stopWriting()
# 如果有一个向我们提供数据的producer,通知其为我们提供更多的数据
if self.producer is not None and ((not self.streamingProducer)
or self.producerPaused):
self.producerPaused = False
self.producer.resumeProducing()
elif self.disconnecting:
# 如果之前有要求关闭连接,那么在数据全书写入客户端之后断开连接
return self._postLoseConnection()
elif self._writeDisconnecting:
# 如果之前有要求关闭连接的写入,即半关闭。socke.shutdown(1),那么现在也断开连接
self._writeDisconnected = True
result = self._closeWriteConnection()
return result
return None
def _postLoseConnection(self):
"""通知reactor连接断开"""
return main.CONNECTION_DONE
def _closeWriteConnection(self):
try:
self.socket.shutdown(1)
# 半关闭连接,后面的任何写操作都将不被允许
except socket.error:
pass
p = interfaces.IHalfCloseableProtocol(self.protocol, None)
if p:
try:
p.writeConnectionLost()
except:
f = failure.Failure()
log.err()
self.connectionLost(f)
# /twisted/internet/tcp.py
"""
该方法在基类Connection类中实现
"""
def writeSomeData(self, data):
"""
尽可能多的写入数据
当连接断开时,将抛出异常否则成功写入的数据的数量将被返回
"""
limitedData = lazyByteSlice(data, 0, self.SEND_LIMIT)
try:
return untilConcludes(self.socket.send, limitedData)
except socket.error as se:
if se.args[0] in (EWOULDBLOCK, ENOBUFS):
return 0
else:
return main.CONNECTION_LOST
# /twisted/python/util.py
def untilConcludes(f, *a, **kw):
while True:
try:
return f(*a, **kw)
except (IOError, OSError) as e:
if e.args[0] == errno.EINTR:
# 系统调用被打断
continue
raise
writeSomeData负责将适当的数据写入,untilConcludes方法确保数据被传送,同时有异常则抛出。当连接关闭时,loseConnectio方法将被调用来处理掉相应的socket以及produce
# /twisted/internet/abstract.py
# FileDescriptor类的方法
def loseConnection(self, _connDone=failure.Failure(main.CONNECTION_DONE)):
if self.connected and not self.disconnecting:
if self._writeDisconnected:
如果已经半关闭连接
self.stopReading()
self.stopWriting()
# 直接断开连接
self.connectionLost(_connDone)
else:
self.stopReading()
# 添加到写监听列表中,使得在连接关闭之前,将缓冲区中的所有数据都发送出去
self.startWriting()
# 将disconnecting置为1,当数据全部发送之后断开连接
self.disconnecting = 1
# twisted/internet/tcp.py
# Connection类的方法
def connectionLost(self, reason):
if not hasattr(self, "socket"):
return
abstract.FileDescriptor.connectionLost(self, reason)
# 调用基类的connectionLost方法,负责将该对象从reactor的读写事件监听列表中移除
self._closeSocket(not reason.check(error.ConnectionAborted))
# 负责关闭socket
protocol = self.protocol
del self.protocol
del self.socket
del self.fileno
protocol.connectionLost(reason)
# 回调对应protocol的connectionLost方法,该方法由用户实现,用于做一些必要清理工作
# /twisted/internet/abstract.py
# FileDescriptor类的方法
def connectionLost(self, reason):
self.disconnected = 1
self.connected = 0
# producer对象停止提供数据,移除对producer对象的引用
if self.producer is not None:
self.producer.stopProducing()
self.producer = None
self.stopReading()
self.stopWriting()
# /twisted/internet/tcp.py
# _SocketCloser是Connection类的基类
class _SocketCloser(object):
_shouldShutdown = True
def _closeSocket(self, orderly):
skt = self.socket
try:
if orderly:
if self._shouldShutdown:
skt.shutdown(2)
# 未来读写都将被禁止
else:
#SO_LINER设置当socket被关闭时,socket缓冲区缓存中残留的数据的处理,这里设置为1 0,表明将放弃这些数据并且发送一个RST给对方
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER,
struct.pack("ii", 1, 0))
except socket.error:
pass
try:
skt.close() # 关闭套接字
except socket.error:
pass
在调用loseConnection方法之前,将会优先把缓存中的数据全部发送掉然后才会关闭。
transport封装了底层的传输机制,确保了数据能够被方便及时的读取或者写入,transport一般不需要用户的重写,用户只需要重写与transport绑定的protocol,在procotol的相应回调方法中调用transport的相应方法完成数据的传输和读取,以及连接的断开
以上是关于Twisted源码分析2的主要内容,如果未能解决你的问题,请参考以下文章