ZeroMQ PUB/SUB 绑定订阅者

Posted

技术标签:

【中文标题】ZeroMQ PUB/SUB 绑定订阅者【英文标题】:ZeroMQ PUB/SUB bind subscriber 【发布时间】:2016-07-08 07:22:28 【问题描述】:

我正在和自己一起研究 ZeroMQ。

我测试了 PUB 作为服务器(绑定),SUB 作为客户端(连接)并且工作正常。相反(PUB 作为客户端(连接),SUB 作为服务器(绑定))也可以正常工作。 当我作为客户端连接另一个 SUB 套接字时,出现问题,没有任何异常或错误。

这是我的示例代码。

#include <zmq.hpp>
#include <string>
#include <iostream>
#include <unistd.h>
#include <thread>

class ZMQSock

public:
    ZMQSock(const char* addr)
    
        if (addr != NULL)
        
            mctx = new zmq::context_t(1);
            mszAddr = new char[strlen(addr) + 1];
            snprintf(mszAddr, strlen(addr) + 1, "%s", addr);
        
    

    virtual ~ZMQSock()
    
        if (msock != nullptr)
            delete msock;
        if (mctx != nullptr)
            delete mctx;
        if (mszAddr != nullptr)
            delete [] mszAddr;
    

    int zbind()
    
        if (msock != nullptr)
            msock->bind(mszAddr);
        else return -1;
        return 0;
    

    int zconnect()
    
        if (msock != nullptr)
            msock->connect(mszAddr);
        else return -1;
            return 0;
    

    void start()
    
        if (mbthread != false)
            return ;

        mbthread = true;
        mhthread = std::thread(std::bind(&ZMQSock::run, this));
    

    virtual void stop()
    
        if (mbthread == false)
            return ;

        mbthread = false;
        if (mhthread.joinable())
            mhthread.join();
    

    virtual void run() = 0;

protected:
    char* mszAddrnullptr;
    zmq::context_t* mctxnullptr;
    zmq::socket_t* msocknullptr;
    bool mbthreadfalse;
    std::thread mhthread;
;

class ZPublisher : public ZMQSock

public:

    ZPublisher(const char* addr) : ZMQSock(addr)
    
        if (msock == nullptr)
        
            msock = new zmq::socket_t(*mctx, ZMQ_PUB);
        

    

    virtual ~ZPublisher()
    
    

    bool zsend(const char* data, const unsigned int length, bool sendmore=false)
    
        zmq::message_t msg(length);
        memcpy(msg.data(), data, length);
        if (sendmore)
            return msock->send(msg, ZMQ_SNDMORE);
        return msock->send(msg);
    

    void run()
    
        if (mszAddr == nullptr)
            return ;
        if (strlen(mszAddr) < 6)
            return ;

        const char* fdelim = "1";
        const char* first = "it sends to first. two can not recv this sentence!\0";

        const char* sdelim = "2";
        const char* second = "it sends to second. one can not recv this sentence!\0";

        while (mbthread)
        
            zsend(fdelim, 1, true);
            zsend(first, strlen(first));

            zsend(sdelim, 1, true);
            zsend(second, strlen(second));

            usleep(1000 * 1000);
        
    

;

class ZSubscriber : public ZMQSock

public:

    ZSubscriber(const char* addr) : ZMQSock(addr)
    
        if (msock == nullptr)
        
            msock = new zmq::socket_t(*mctx, ZMQ_SUB);
        
    

    virtual ~ZSubscriber()
    
    

    void setScriberDelim(const char* delim, const int length)
    
        msock->setsockopt(ZMQ_SUBSCRIBE, delim, length);
        mdelim = std::string(delim, length);
    

    std::string zrecv()
    
        zmq::message_t msg;
        msock->recv(&msg);
        return std::string(static_cast<char*>(msg.data()), msg.size());
    

    void run()
    
        if (mszAddr == nullptr)
            return ;
        if (strlen(mszAddr) < 6)
            return ;

        while (mbthread)
        
            std::cout << "MY DELIM IS [" << mdelim << "]  -  MSG : ";
            std::cout << zrecv() << std::endl;

            usleep(1000 * 1000);
        
    

private:
    std::string mdelim;
;

int main ()

    ZPublisher pub("tcp://localhost:5252");
    ZSubscriber sub1("tcp://localhost:5252");
    ZSubscriber sub2("tcp://*:5252");

    pub.zconnect();
    sub1.zconnect();
    sub2.zbind();
    sub1.setScriberDelim("1", 1);
    sub2.setScriberDelim("2", 1);

    pub.start();
    std::cout << "PUB Server has been started.." << std::endl;

    usleep(1000 * 1000);

    sub1.start();
    std::cout << "SUB1 Start." << std::endl;

    sub2.start();
    std::cout << "SUB2 Start." << std::endl;

    int i = 0;
    std::cout << "< Press any key to exit program. >" << std::endl;
    std::cin >> i;

    std::cout << "SUB1 STOP START" << std::endl;
    sub1.stop();
    std::cout << "SUB2 STOP START" << std::endl;
    sub2.stop();
    std::cout << "PUB  STOP START" << std::endl;
    pub.stop();
    std::cout << "ALL DONE" << std::endl;
    return 0;

这是什么原因造成的?还是我非法使用 PUB/SUB?

【问题讨论】:

【参考方案1】:

您正在将一个 SUB 套接字连接到一个 SUB 套接字,这是一个无效的连接。在您的情况下,PUB 应该绑定并且 SUB 应该连接。

【讨论】:

以上是关于ZeroMQ PUB/SUB 绑定订阅者的主要内容,如果未能解决你的问题,请参考以下文章

ZeroMQ 反转 PUB/SUB 的问题

ZeroMq PUB/SUB 模式无法正常工作

GCP Pub/Sub 消息发送到创建主题之前存在的订阅

ZeroMQ/NanoMsg 发布/订阅与多播

如何在 ZeroMQ 中以正确的方式中止 context.socket.recv()?

列出 Pub/Sub 订阅使用者