使用 Python Twisted 和 Autobahn 从 Matlab 通过 WebSocket 发送 JSON 数据

Posted

技术标签:

【中文标题】使用 Python Twisted 和 Autobahn 从 Matlab 通过 WebSocket 发送 JSON 数据【英文标题】:Sending JSON data over WebSocket from Matlab using Python Twisted and Autobahn 【发布时间】:2016-03-25 07:14:30 【问题描述】:

我正在尝试从 Matlab 创建一个连接,以通过 WebSocket 流式传输 JSON 帧。我已经测试了我的高速公路 python 安装并使用以下方法进行了扭曲。

工作示例

Matlab 代码

使用JSONlab 工具箱将Matlab 数据转换为JSON 格式然后我compress 和Base64 对数据进行编码的示例驱动程序代码。由于我还没有让 RPC 工作,所以我使用需要压缩和 Base64 编码的命令行来避免行长和 shell 转义问题。

clear all
close all

python = '/usr/local/bin/python'
bc = '/Users/palmerc/broadcast_client.py'
i = uint32(1)

encoder = org.apache.commons.codec.binary.Base64
while true
    tic;
    packet = rand(100, 100);
    json_packet = uint8(savejson('', packet));
    compressed = CompressLib.compress(json_packet);
    b64 = char(encoder.encode(compressed));
    message = sprintf('%s %s %s', python, bc, b64);
    status = system(message);

    i = i + 1;
    toc;
end

广播客户端代码

客户端代码有两种调用方式。您可以通过命令行传递消息或创建 BroadcastClient 实例并调用 sendMessage。

#!/usr/bin/env python

import sys
from twisted.internet import reactor
from txjsonrpc.web.jsonrpc import Proxy


class BroadcastClient():

    def __init__(self, server=None):
        self.proxy = Proxy(server)

    def errorMessage(self, value):
        print 'Error ', value

    def sendMessage(self, message):
        rc = self.proxy.callRemote('broadcastMessage', message).addCallback(lambda _: reactor.stop())
        rc.addErrback(self.errorMessage)


def main(cli_arguments):
    if len(cli_arguments) > 1:
        message = cli_arguments[1]
        broadcastClient = BroadcastClient('http://127.0.0.1:7080/')
        broadcastClient.sendMessage(message)
        reactor.run()

if __name__ == '__main__':
    main(sys.argv)

广播服务器代码

服务器使用 TXJSONRPC、Twisted 和 Autobahn 在 7080 上提供 RPC 客户端、在 8080 上提供 Web 客户端和在 9080 上提供 WebSocket。 Autobahn Web Client 对调试很有用,应与服务器代码放在同一目录中。

#!/usr/bin/env python

import sys

from twisted.internet import reactor
from twisted.python import log
from twisted.web.server import Site
from twisted.web.static import File
from txjsonrpc.web import jsonrpc

from autobahn.twisted.websocket import WebSocketServerFactory, \
    WebSocketServerProtocol, \
    listenWS


class BroadcastServerProtocol(WebSocketServerProtocol):

    def onOpen(self):
        self.factory.registerClient(self)

    def onMessage(self, payload, isBinary):
        if not isBinary:
            message = " from ".format(payload.decode('utf8'), self.peer)
            self.factory.broadcastMessage(message)

    def connectionLost(self, reason):
        WebSocketServerProtocol.connectionLost(self, reason)
        self.factory.unregisterClient(self)


class BroadcastServerFactory(WebSocketServerFactory):

    """
    Simple broadcast server broadcasting any message it receives to all
    currently connected clients.
    """

    def __init__(self, url, debug=False, debugCodePaths=False):
        WebSocketServerFactory.__init__(self, url, debug=debug, debugCodePaths=debugCodePaths)
        self.clients = []

    def registerClient(self, client):
        if client not in self.clients:
            print("registered client ".format(client.peer))
            self.clients.append(client)

    def unregisterClient(self, client):
        if client in self.clients:
            print("unregistered client ".format(client.peer))
            self.clients.remove(client)

    def broadcastMessage(self, message):
        print("broadcasting message '' ..".format(message))
        for client in self.clients:
            client.sendMessage(message.encode('utf8'))
            print("message sent to ".format(client.peer))


class BroadcastPreparedServerFactory(BroadcastServerFactory):

    """
    Functionally same as above, but optimized broadcast using
    prepareMessage and sendPreparedMessage.
    """

    def broadcastMessage(self, message):
        print("broadcasting prepared message '' ..".format(message))
        preparedMessage = self.prepareMessage(message.encode('utf8'), isBinary=False)
        for client in self.clients:
            client.sendPreparedMessage(preparedMessage)
            print("prepared message sent to ".format(client.peer))


class MatlabClient(jsonrpc.JSONRPC):

    factory = None

    def jsonrpc_broadcastMessage(self, message):
        if self.factory is not None:
            print self.factory.broadcastMessage(message)


if __name__ == '__main__':

    if len(sys.argv) > 1 and sys.argv[1] == 'debug':
        log.startLogging(sys.stdout)
        debug = True
    else:
        debug = False
    factory = BroadcastPreparedServerFactory(u"ws://127.0.0.1:9000",
                                             debug=debug,
                                             debugCodePaths=debug)

    factory.protocol = BroadcastServerProtocol
    listenWS(factory)

    matlab = MatlabClient()
    matlab.factory = factory
    reactor.listenTCP(7080, Site(matlab))

    webdir = File(".")
    web = Site(webdir)
    reactor.listenTCP(8080, web)

    reactor.run()

问题 - 尝试失败

首先请注意,如果您在从 Matlab 中运行 python 时遇到问题,您需要确保使用 pyversion 命令在您的系统上指向正确版本的 Python,并且您可以使用 pyversion('/path/to/python') 更正它

Matlab 无法运行反应器

clear all
close all

i = uint32(1)

while true
    tic;
    packet = rand(100, 100);
    json_packet = uint8(savejson('', packet));
    compressed = CompressLib.compress(json_packet);
    b64 = char(encoder.encode(compressed));
    bc.sendMessage(py.str(b64.'));
    py.twisted.internet.reactor.run % This won't work.

    i = i + 1;
    toc;
end

Matlab POST

另一个尝试涉及使用 Matlab 的 webwrite POST 到服务器。结果webwrite 只需传递正确的weboptions 即可将数据转换为JSON。

options = weboptions('MediaType', 'application/json');
data = struct('Matrix', rand(100, 100));
webwrite(server, data, options);

这行得通,但结果证明每条消息很慢(约 0.1 秒)。我应该提到,矩阵不是我发送的真实数据,真实数据序列化为每条消息大约 280000 个字节,但这提供了一个合理的近似值。

我怎样才能调用bc.sendMessage,以便它正确地设法让反应器运行或以另一种更快的方式解决这个问题?

【问题讨论】:

【参考方案1】:

使用 Python 和 Matlab 设置 WebSocket

检查 Matlab 是否指向正确的 python 版本

首先,您需要确保您使用的是正确的 Python 二进制文件。在 Mac 上,您可能使用的是系统标准版本,而不是 Homebrew 安装的版本。使用以下命令检查 python 安装的位置:

pyversion

您可以使用以下命令将 Matlab 指向正确的版本:

pyversion('path/to/python')

这可能需要你重新启动 python。

如上所述,我使用 Twisted 将我的 Matlab 数据多路复用到 WebSocket 客户端。我发现解决此问题的最佳方法是简单地创建一个处理 POSTS 的服务器,然后将其传递给 WebSocket 客户端。压缩只是减慢了速度,所以我每个请求发送 280 kBytes 的 JSON,每条消息大约需要 0.05 秒。我希望这更快,0.01 秒,但这是一个好的开始。

Matlab 代码

server = 'http://127.0.0.1:7080/update.json';
headers = py.dict(pyargs('Charset','UTF-8','Content-Type','application/json'));
while true
    tic;
    packet = rand(100, 100);
    json_packet = savejson('', packet);
    r = py.requests.post(server, pyargs('data', json_packet, 'headers', headers));
    toc;
end

我本可以使用 Matlab webwrite 函数,但通常我发现调用 python 更灵活。

Python WebSocket-WebClient 服务器

import sys

from twisted.internet import reactor
from twisted.python import log
from twisted.web.resource import Resource
from twisted.web.server import Site
from twisted.web.static import File

from autobahn.twisted.websocket import WebSocketServerFactory, \
    WebSocketServerProtocol, \
    listenWS


class BroadcastServerProtocol(WebSocketServerProtocol):

    def onOpen(self):
        self.factory.registerClient(self)

    def onMessage(self, payload, isBinary):
        if not isBinary:
            message = " from ".format(payload.decode('utf8'), self.peer)
            self.factory.broadcastMessage(message)

    def connectionLost(self, reason):
        WebSocketServerProtocol.connectionLost(self, reason)
        self.factory.unregisterClient(self)


class BroadcastServerFactory(WebSocketServerFactory):

    def __init__(self, url, debug=False, debugCodePaths=False):
        WebSocketServerFactory.__init__(self, url, debug=debug, debugCodePaths=debugCodePaths)
        self.clients = []

    def registerClient(self, client):
        if client not in self.clients:
            print("registered client ".format(client.peer))
            self.clients.append(client)

    def unregisterClient(self, client):
        if client in self.clients:
            print("unregistered client ".format(client.peer))
            self.clients.remove(client)

    def broadcastMessage(self, message):
        for client in self.clients:
            client.sendMessage(message.encode('utf8'))


class BroadcastPreparedServerFactory(BroadcastServerFactory):

    def broadcastMessage(self, message, isBinary=False):
        if isBinary is True:
            message = message.encode('utf8')
        preparedMessage = self.prepareMessage(message, isBinary=isBinary)
        for client in self.clients:
            client.sendPreparedMessage(preparedMessage)


class WebClient(Resource):

    webSocket = None

    def render_POST(self, request):
        self.webSocket.broadcastMessage(request.content.read())

        return 'OK'


if __name__ == '__main__':

    if len(sys.argv) > 1 and sys.argv[1] == 'debug':
        log.startLogging(sys.stdout)
        debug = True
    else:
        debug = False
    factory = BroadcastPreparedServerFactory(u"ws://127.0.0.1:9000",
                                             debug=debug,
                                             debugCodePaths=debug)

    factory.protocol = BroadcastServerProtocol
    listenWS(factory)

    root = Resource()
    webClient = WebClient()
    webClient.webSocket = factory
    root.putChild('update.json', webClient)
    webFactory = Site(root)
    reactor.listenTCP(7080, webFactory)

    webdir = File(".")
    web = Site(webdir)
    reactor.listenTCP(8080, web)

    reactor.run()

我摆脱了 RPC 尝试,直接进行了 POST。仍然有很多提高性能的机会。

【讨论】:

以上是关于使用 Python Twisted 和 Autobahn 从 Matlab 通过 WebSocket 发送 JSON 数据的主要内容,如果未能解决你的问题,请参考以下文章

python3下的twisted

使用 Python Twisted 和 Autobahn 从 Matlab 通过 WebSocket 发送 JSON 数据

Python:如何使用 Twisted 作为 SUDS 的传输方式?

Python爬虫编程思想(27):Twisted框架基础

Python Twisted介绍

Python3 Twisted Mysql错误