我应该如何处理 twisted.application.internet.ClientService 中的重新连接?

Posted

技术标签:

【中文标题】我应该如何处理 twisted.application.internet.ClientService 中的重新连接?【英文标题】:How should I handle reconnections in twisted.application.internet.ClientService? 【发布时间】:2016-08-31 21:40:24 【问题描述】:

我正在尝试在一个扭曲的应用程序中使用recently introduced twisted.application.internet.ClientService 类,该应用程序使用pymodbus 进行简单的modbus-tcp 轮询。我觉得我的问题与我使用的 modbus 协议无关,因为我使用较低级别的扭曲 API 创建了很多其他工作原型;但是这个新的ClientService 看起来完全符合我的需求,因此应该减少我的代码占用空间并保持它的整洁,如果我可以让它工作的话。

我的测试显示ClientService 可以按预期处理重新连接,并且我可以轻松访问第一个连接Protocol。我遇到的问题是获取后续Protocol 对象以进行重新连接。这是我遇到问题的代码的简化版本:

from twisted.application import internet, service
from twisted.internet.protocol import ClientFactory
from twisted.internet import reactor, endpoints
from pymodbus.client.async import ModbusClientProtocol

class ModbusPollingService(internet.ClientService):
    def __init__(self, addrstr, numregs=5):
        self.numregs=numregs
        internet.ClientService.__init__(self,
            endpoints.clientFromString(reactor, addrstr),
            ClientFactory.forProtocol(ModbusClientProtocol))

    def startService(self):
        internet.ClientService.startService(self)
        self._pollWhenConnected()

    def _pollWhenConnected(self):
        d = self.whenConnected()
        d.addCallback(self._connected)
        d.addErrback(self._connfail)

    def _connected(self, p):
        self._log.debug("connected: p", p=p)
        self._mbp = p
        self._poll()
        return True

    def _connfail(self, failstat):
        self._log.failure('connection failure', failure=failstat)
        self._mbp = None
        self._pollWhenConnected()

    def _poll(self):
        self._log.debug("poll: n", n=self.numregs)
        d = self._mbp.read_holding_registers(0, self.numregs)
        d.addCallback(self._regs)
        d.addErrback(self._connfail)

    def _regs(self, res):
        self._log.debug("regs: r", r=res.registers)
        # Do real work of dealing storing registers here
        reactor.callLater(1, self._poll)
        return res

application = service.Application("ModBus Polling Test")
mbpollsvc = ModbusPollingService('tcp:127.0.0.1:502')
mbpollsvc.setServiceParent(application)

当连接失败(无论出于何种原因)时,从read_holding_registers() 返回的deferrederrback 被调用,目的是我的服务可以放弃该Protocol 并返回等待状态新连接Protocol 将由whenConnected() 回调返回...但是似乎正在发生的事情是ClientService 尚未意识到连接已失效并返回相同的断开连接协议,给我一个完整的日志:

2016-05-05 17:28:25-0400 [-] connected: <pymodbus.client.async.ModbusClientProtocol object at 0x000000000227b558>
2016-05-05 17:28:25-0400 [-] poll: 5
2016-05-05 17:28:25-0400 [-] connection failure
    Traceback (most recent call last):
    Failure: pymodbus.exceptions.ConnectionException: Modbus Error: [Connection] Client is not connected

2016-05-05 17:28:25-0400 [-] connected: <pymodbus.client.async.ModbusClientProtocol object at 0x000000000227b558>
2016-05-05 17:28:25-0400 [-] poll: 5
2016-05-05 17:28:25-0400 [-] connection failure
    Traceback (most recent call last):
    Failure: pymodbus.exceptions.ConnectionException: Modbus Error: [Connection] Client is not connected

或者很相似,注意重复的ModbusClientProtocol对象地址。

我很确定我可能只是为这个 API 选择了一个糟糕的模式,但我已经迭代了一些不同的可能性,例如基于 @987654339 创建我自己的 ProtocolFactory @ 并完全在该类中处理轮询机制;但是通过持久性配置和机制以这种方式存储轮询数据感觉有点混乱,似乎在 ClientService 级别或更高级别处理这个是一种更清洁的方法,但我无法找到跟踪的最佳方法当前连接的协议。我想我真正想要的是在扩展轮询情况下使用ClientService 类的最佳实践建议。

【问题讨论】:

【参考方案1】:

你没有在任何我能看到的地方调用 self.transport.loseConnection() 来响应你的投票,所以就twisted 可以看出,你实际上并没有断开连接。可能会在以后,当你停止在旧交通工具上做任何事情时,但到那时你已经忘记了事情。

【讨论】:

【参考方案2】:

这是一个老问题。但是,希望它能对其他人有所帮助。

我遇到的问题是获取后续协议对象以进行重新连接。

提供prepareConnection 可调用的ClientService 构造函数。它将提供电流连接。

在下面的示例中,MyService 将自身附加到 MyFactory。这样做的主要原因是MyFactory 可以让MyService 知道ClientService 何时断开连接。这是可能的,因为ClientService 在断开连接时会调用Factory.stopFactory

下次ClientService 重新连接时,它将调用其prepareConnection 提供当前协议实例。

(重新连接)ClientService:

# clientservice.py
# twistd -y clientservice.py

from twisted.application import service, internet
from twisted.internet.protocol import Factory
from twisted.internet import endpoints, reactor
from twisted.protocols import basic
from twisted.logger import Logger


class MyProtocol(basic.Int16StringReceiver):
    _log = Logger()

    def stringReceived(self, data):
        self._log.info('Received data from peer, data=data',
                       peer=self.transport.getPeer(),
                       data=data)


class MyFactory(Factory):
    _log = Logger()
    protocol = MyProtocol

    def stopFactory(self):
        # Let service know that its current connection is stale
        self.service.on_connection_lost()


class MyService(internet.ClientService):
    def __init__(self, endpoint, factory):
        internet.ClientService.__init__(self,
            endpoint,
            factory,
            prepareConnection=self.on_prepare_connection)

        factory.service = self # Attach this service to factory
        self.connection = None # Future protocol instance

    def on_prepare_connection(self, connection):
        self.connection = connection # Attach protocol to service
        self._log.info('Connected to peer',
                       peer=self.connection.transport.getPeer())
        self.send_message('Hello from prepare connection!')

    def on_connection_lost(self):
        if self.connection is None:
            return

        self._log.info('Disconnected from peer',
                       peer=self.connection.transport.getPeer())
        self.connection = None

    def send_message(self, message):
        if self.connection is None:
            raise Exception('Service is not available')

        self.connection.sendString(bytes(message, 'utf-8'))


application = service.Application('MyApplication')
my_endpoint = endpoints.clientFromString(reactor, 'tcp:localhost:22222')
my_factory = MyFactory()
my_service = MyService(my_endpoint, my_factory)
my_service.setServiceParent(application)

从扭曲的示例中稍微修改回显服务器:

#!/usr/bin/env python
# echoserv.py
# python echoserv.py

# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.

from twisted.internet.protocol import Protocol, Factory
from twisted.internet import reactor
from twisted.protocols import basic

### Protocol Implementation

# This is just about the simplest possible protocol
class Echo(basic.Int16StringReceiver):
    def stringReceived(self, data):
        """
        As soon as any data is received, write it back.
        """
        print("Received:", data.decode('utf-8'))
        self.sendString(data)


def main():
    f = Factory()
    f.protocol = Echo
    reactor.listenTCP(22222, f)
    reactor.run()

if __name__ == '__main__':
    main()

【讨论】:

以上是关于我应该如何处理 twisted.application.internet.ClientService 中的重新连接?的主要内容,如果未能解决你的问题,请参考以下文章

关于颠覆,我应该如何处理供应商目录?

我应该如何处理 RESTful API 中的对象层次结构?

我应该如何处理包中的头文件?

你应该如何处理 UIAlertAction 的闭包参数

证书吊销后,我应该如何处理 fastlane match?

嵌套 UICollectionViews - 我应该如何处理来自核心数据的数据?