ZeroMQ PUB/SUB 绑定订阅者
Posted
技术标签:
【中文标题】ZeroMQ PUB/SUB 绑定订阅者【英文标题】:ZeroMQ PUB/SUB bind subscriber 【发布时间】:2016-07-08 07:22:28 【问题描述】:我正在和自己一起研究 ZeroMQ。
我测试了 PUB 作为服务器(绑定),SUB 作为客户端(连接)并且工作正常。相反(PUB 作为客户端(连接),SUB 作为服务器(绑定))也可以正常工作。 当我作为客户端连接另一个 SUB 套接字时,出现问题,没有任何异常或错误。
这是我的示例代码。
#include <zmq.hpp>
#include <string>
#include <iostream>
#include <unistd.h>
#include <thread>
class ZMQSock
public:
ZMQSock(const char* addr)
if (addr != NULL)
mctx = new zmq::context_t(1);
mszAddr = new char[strlen(addr) + 1];
snprintf(mszAddr, strlen(addr) + 1, "%s", addr);
virtual ~ZMQSock()
if (msock != nullptr)
delete msock;
if (mctx != nullptr)
delete mctx;
if (mszAddr != nullptr)
delete [] mszAddr;
int zbind()
if (msock != nullptr)
msock->bind(mszAddr);
else return -1;
return 0;
int zconnect()
if (msock != nullptr)
msock->connect(mszAddr);
else return -1;
return 0;
void start()
if (mbthread != false)
return ;
mbthread = true;
mhthread = std::thread(std::bind(&ZMQSock::run, this));
virtual void stop()
if (mbthread == false)
return ;
mbthread = false;
if (mhthread.joinable())
mhthread.join();
virtual void run() = 0;
protected:
char* mszAddrnullptr;
zmq::context_t* mctxnullptr;
zmq::socket_t* msocknullptr;
bool mbthreadfalse;
std::thread mhthread;
;
class ZPublisher : public ZMQSock
public:
ZPublisher(const char* addr) : ZMQSock(addr)
if (msock == nullptr)
msock = new zmq::socket_t(*mctx, ZMQ_PUB);
virtual ~ZPublisher()
bool zsend(const char* data, const unsigned int length, bool sendmore=false)
zmq::message_t msg(length);
memcpy(msg.data(), data, length);
if (sendmore)
return msock->send(msg, ZMQ_SNDMORE);
return msock->send(msg);
void run()
if (mszAddr == nullptr)
return ;
if (strlen(mszAddr) < 6)
return ;
const char* fdelim = "1";
const char* first = "it sends to first. two can not recv this sentence!\0";
const char* sdelim = "2";
const char* second = "it sends to second. one can not recv this sentence!\0";
while (mbthread)
zsend(fdelim, 1, true);
zsend(first, strlen(first));
zsend(sdelim, 1, true);
zsend(second, strlen(second));
usleep(1000 * 1000);
;
class ZSubscriber : public ZMQSock
public:
ZSubscriber(const char* addr) : ZMQSock(addr)
if (msock == nullptr)
msock = new zmq::socket_t(*mctx, ZMQ_SUB);
virtual ~ZSubscriber()
void setScriberDelim(const char* delim, const int length)
msock->setsockopt(ZMQ_SUBSCRIBE, delim, length);
mdelim = std::string(delim, length);
std::string zrecv()
zmq::message_t msg;
msock->recv(&msg);
return std::string(static_cast<char*>(msg.data()), msg.size());
void run()
if (mszAddr == nullptr)
return ;
if (strlen(mszAddr) < 6)
return ;
while (mbthread)
std::cout << "MY DELIM IS [" << mdelim << "] - MSG : ";
std::cout << zrecv() << std::endl;
usleep(1000 * 1000);
private:
std::string mdelim;
;
int main ()
ZPublisher pub("tcp://localhost:5252");
ZSubscriber sub1("tcp://localhost:5252");
ZSubscriber sub2("tcp://*:5252");
pub.zconnect();
sub1.zconnect();
sub2.zbind();
sub1.setScriberDelim("1", 1);
sub2.setScriberDelim("2", 1);
pub.start();
std::cout << "PUB Server has been started.." << std::endl;
usleep(1000 * 1000);
sub1.start();
std::cout << "SUB1 Start." << std::endl;
sub2.start();
std::cout << "SUB2 Start." << std::endl;
int i = 0;
std::cout << "< Press any key to exit program. >" << std::endl;
std::cin >> i;
std::cout << "SUB1 STOP START" << std::endl;
sub1.stop();
std::cout << "SUB2 STOP START" << std::endl;
sub2.stop();
std::cout << "PUB STOP START" << std::endl;
pub.stop();
std::cout << "ALL DONE" << std::endl;
return 0;
这是什么原因造成的?还是我非法使用 PUB/SUB?
【问题讨论】:
【参考方案1】:您正在将一个 SUB 套接字连接到一个 SUB 套接字,这是一个无效的连接。在您的情况下,PUB 应该绑定并且 SUB 应该连接。
【讨论】:
以上是关于ZeroMQ PUB/SUB 绑定订阅者的主要内容,如果未能解决你的问题,请参考以下文章