ZMQ Python PUB/SUB 有效,但我的 C++ 订阅者与 Python Publisher 无效

Posted

技术标签:

【中文标题】ZMQ Python PUB/SUB 有效,但我的 C++ 订阅者与 Python Publisher 无效【英文标题】:ZMQ Python PUB/SUB works but not my C++ Subscriber with Python Publisher 【发布时间】:2019-04-05 19:04:29 【问题描述】:

我使用的是 Ubuntu 18.04、Qt 5.12 和 libzmq.so.5.1.5。我有 2 个非常简单的 Python 3.6 脚本,使用 pyzmq 18.0.1,一个实现 ZMQ PUB,一个实现 ZMQ SUB。这些 Python 脚本工作查找和数据在本地主机环回网络上从一个传输到另一个。我正在尝试在 Qt 5.12 中实现订阅者,当我使用相同的 Python 发布者脚本发送相同的数据时,订阅者总是阻塞接收。我在下面粘贴了完整的代码 - 我如何使用 Qt 5.12 获取 ZMQ 以实际接收数据?谢谢。

我的 C++ 应用程序的输出看起来很成功:

"Server IP Address determined to be: 127.0.0.1"
"Connecting to: tcp://127.0.0.1:5555"
"Attempted Connect: 0"
"SetSockOpt: 0"
"GetSockOpt: 0"
Done setting up socket
Waiting

Python 3 PUB:

#!/usr/bin/python3

import time
import zmq
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://127.0.0.1:5555")

# Allow clients to connect before sending data
time.sleep(3)
while True:
    socket.send_pyobj(1:[1,2,3])
    time.sleep(1)

Python 3 订阅:

#!/usr/bin/python3

import zmq
context = zmq.Context()
socket = context.socket(zmq.SUB)
# We can connect to several endpoints if we desire, and receive from all.
socket.connect("tcp://127.0.0.1:5555")

# We must declare the socket as of type SUBSCRIBER, and pass a prefix filter.
# Here, the filter is the empty string, wich means we receive all messages.
# We may subscribe to several filters, thus receiving from all.
socket.setsockopt(zmq.SUBSCRIBE, b'')

while True:
    message = socket.recv_pyobj()
    print(message.get(1)[2])

Qt 5.12 订阅者代码:

#include <QCoreApplication>

// Std includes
#include <stdio.h>
#include <unistd.h>
#include <assert.h>

// Qt
#include <QDebug>
#include <QFile>

// ZeroMQ Includes
#include <zmq.h>

int main(int argc, char *argv[])

    QCoreApplication a(argc, argv);

    //QString m_ServerIP = QString("*");
    QString m_ServerIP = QString("127.0.0.1");
    QString m_ServerPort = QString("5555");

    qDebug() << QString("Server IP Address determined to be: %1").arg(m_ServerIP);

    void* m_Context = zmq_ctx_new();
    assert (m_Context);
    void* m_Subscriber = zmq_socket (m_Context, ZMQ_SUB);
    assert (m_Subscriber);
    int rc = -1;
    unsigned int fd = 0;
    do 

        const char *filter = std::string("").c_str();
        QString ipAndPort = QString("tcp://%1:%2").arg(m_ServerIP).arg(m_ServerPort);

        qDebug() << QString("Connecting to: %1").arg(ipAndPort);

        rc = zmq_connect(m_Subscriber, ipAndPort.toStdString().c_str());

        qDebug() << QString("Attempted Connect: %1").arg(rc);

        rc = zmq_setsockopt(m_Subscriber, ZMQ_SUBSCRIBE,filter, strlen (filter));

        qDebug() << QString("SetSockOpt: %1").arg(rc);

        size_t fd_size = sizeof(fd);

        rc = zmq_getsockopt(m_Subscriber,ZMQ_FD,&fd,&fd_size);

        qDebug() << QString("GetSockOpt: %1").arg(rc);

    
    while ( rc < 0 );

    qDebug() << "Done setting up socket";

    while ( true) 

        zmq_msg_t message;
        zmq_msg_init(&message);
        qDebug() << "Waiting";
        zmq_recvmsg(m_Subscriber, &message, 0);
        size_t size = zmq_msg_size (&message);

        qDebug() << QString("Message Size: %1").arg(size);

        char *string = static_cast<char*>(malloc(size + 1));
        memcpy (string, zmq_msg_data(&message), size);
        zmq_msg_close (&message);
        string [size] = 0;
        if (string) 
            QByteArray frame = QByteArray::fromBase64(QByteArray(string));
            free(string);

            qDebug() << QString("Debug RX Frame Size: %1").arg(frame.size());

            QFile output("/tmp/abcd.jpeg");
            if ( output.open(QIODevice::WriteOnly) ) 
                output.write(frame);
                output.close();
            

        

    

    return a.exec();

【问题讨论】:

【参考方案1】:

您有以下错误:

您不需要 QCoreApplication,因为您没有使用需要事件循环的信号、插槽、事件等。

您使用的是zmq的C api,有一个兼容C++的api,要使用它,请使用标头&lt;zmq.hpp&gt;

当您使用send_pyobj() python 时,它使用pickle 来传输数据,但在 C++ 中没有方法可以恢复数据。相反,您应该使用来自 python 的send_json() 发送数据。

综合以上,解决办法是:

*.py

#!/usr/bin/python3

import time
import zmq
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://127.0.0.1:5555")

# Allow clients to connect before sending data
time.sleep(3)
while True:
    socket.send_json(1:[1,2,3])
    time.sleep(1)

ma​​in.cpp

#include <QString>
#include <QDebug>
#include <QUrl>
#include <QThread>

#include <zmq.hpp>

int main(void) 

    QString m_ServerIP = QString("127.0.0.1");
    int m_ServerPort = 5555;

    qDebug() << QString("Server IP Address determined to be: %1").arg(m_ServerIP);

    QUrl url;
    url.setScheme("tcp");
    url.setHost(m_ServerIP);
    url.setPort(m_ServerPort);

    zmq::context_t context(1);
    zmq::socket_t subscriber (context, ZMQ_SUB);
    subscriber.connect(url.toString().toStdString());
    subscriber.setsockopt( ZMQ_SUBSCRIBE, "", 0);

    while(true) 
        zmq_msg_t in_msg;
        zmq_msg_init(&in_msg);
        zmq::message_t  message;
        subscriber.recv(&message);
        qDebug() << QString("Message Size: %1").arg(message.size());
        QByteArray ba(static_cast<char*>(message.data()), message.size());
        qDebug()<< "message" << ba;
        QThread::sleep(1);
    

输出:

"Server IP Address determined to be: 127.0.0.1"
"Message Size: 13"
message "\"1\":[1,2,3]"
"Message Size: 13"
message "\"1\":[1,2,3]"
"Message Size: 13"
message "\"1\":[1,2,3]"
"Message Size: 13"
message "\"1\":[1,2,3]"
"Message Size: 13"
message "\"1\":[1,2,3]"

您可以使用QJsonDocument 解码数据。我还建议您查看以下examples in C++ of zmq。


如果你使用socket.send_pyobj(1:[1,2,3]),你会得到以下是pickle的结果:

"Server IP Address determined to be: 127.0.0.1"
"Message Size: 20"
message "\x80\x03q\x00K\x01]q\x01(K\x01K\x02K\x03""es."
"Message Size: 20"
message "\x80\x03q\x00K\x01]q\x01(K\x01K\x02K\x03""es."
"Message Size: 20"
message "\x80\x03q\x00K\x01]q\x01(K\x01K\x02K\x03""es."
"Message Size: 20"
message "\x80\x03q\x00K\x01]q\x01(K\x01K\x02K\x03""es."
"Message Size: 20"
message "\x80\x03q\x00K\x01]q\x01(K\x01K\x02K\x03""es."

【讨论】:

感谢您的帮助!

以上是关于ZMQ Python PUB/SUB 有效,但我的 C++ 订阅者与 Python Publisher 无效的主要内容,如果未能解决你的问题,请参考以下文章

ZeroMQ PUB/SUB 绑定订阅者

如何从 Dataflow 批量(有效)发布到 Pub/Sub?

在我的端点上看不到 Google Pub/Sub 推送通知

如何使用 python 在云 pub/sub 中访问 grpc?

Pub\Sub Python 客户端 - 优雅地关闭订阅者

我怎样才能使用 Python 从 Google Pub/Sub 中以足够快的速度阅读