ZeroMq PUB/SUB 模式无法正常工作

Posted

技术标签:

【中文标题】ZeroMq PUB/SUB 模式无法正常工作【英文标题】:ZeroMq PUB/SUB pattern not working properly 【发布时间】:2014-12-02 07:30:49 【问题描述】:

我的要求

    高吞吐量,每秒至少 5000 条消息 交货顺序不重要 很明显,发布者不应该等待响应,也不应该关心订阅者是否正在监听

背景

我正在为每条消息创建一个新线程,因为如果我不这样做,消息生成部分将超过发送线程的速度并且消息会丢失,因此每条消息的线程似乎是正确的方法

问题:

问题在于以某种方式开始发送 zMQ 消息的线程没有被终止(没有退出/完成)。下面这行好像有问题:

s_send(*client, request.str());

因为如果我删除它,那么线程会正常终止,所以可能是它导致问题的这一行,我的第一个猜测是线程正在等待响应,但是 zmq_PUB 是否等待响应?

这是我的代码:

void *SendHello(void *threadid) 
    long tid;
    tid = (long) threadid;
    //cout << "Hello World! Thread ID, " << tid << endl;
    std::stringstream request;
    //writing the hex as request to be sent to the server
    request << tid;
    s_send(*client, request.str());
    pthread_exit(NULL);


int main() 
    int sequence = 0;

    int NUM_THREADS = 1000;
    while (1) 
        pthread_t threads[NUM_THREADS];
        int rc;
        int i;
        for (i = 0; i < NUM_THREADS; i++) 
            cout << "main() : creating thread, " << i << endl;
            rc = pthread_create(&threads[i], NULL, SendHello, (void *) i);
        pthread_detach(threads[i]);
        sched_yield();

            if (rc) 
                cout << "Error:unable to create thread," << rc << endl;
                exit(-1);
            
        
        //usleep(1000);
        sleep(1);
    
    pthread_exit(NULL);
    //delete client;
    return 0;

我的问题:

我是否需要调整 zMQ 套接字以便 PUB 不等待回复我做错了什么?

编辑:

添加客户端定义:

static zmq::socket_t * s_client_socket(zmq::context_t & context) 
    std::cout << "I: connecting to server." << std::endl;
    zmq::socket_t * client = new zmq::socket_t(context, ZMQ_SUB);
    client->connect("tcp://localhost:5555");

    // Configure socket to not wait at close time
    int linger = 0;
    client->setsockopt(ZMQ_LINGER, &linger, sizeof (linger));
    return client;


zmq::context_t context(1);
zmq::socket_t * client = s_client_socket(context);

【问题讨论】:

客户端定义在哪里?你确定这完全有效 ZMQ_PAIR 在您的套接字调用中?那不应该是ZMQ_SUB吗? @DavidBrabant 你是对的,实际上它是 ZMQ_SUB,现在更正它 【参考方案1】:

但是 zmq_PUB 会等待响应吗?

不,如果您的插座不是 PUB 插座并且您达到了高水位线,则可能会出现这种情况,但事实并非如此。消息是否被发送?

【讨论】:

是的,他们有,哦你的意思是说如果他们没有收到,线程就不会完成?所以 PUBLISHER 依赖于 SUBSCRIBER?它保存消息直到任何 SUBSCRIBER 消费它? 不,PUB 套接字永远不会阻塞。我已经尝试过这种情况,即使没有订阅者,发布者也会继续发布,这是有道理的。一定是哪里出了问题……

以上是关于ZeroMq PUB/SUB 模式无法正常工作的主要内容,如果未能解决你的问题,请参考以下文章

使用 ZeroMQ 使用 PUB/SUB 实现 Master/Worker 模式

zeromq pub sub 上丢失的消息

ZeroMQ 反转 PUB/SUB 的问题

zeromq使用模式实验总结

消息队列、EventBus 和 Pub/Sub 之间的区别?

Google Cloud 上使用 Pub/Sub 的主/从模式