zeromq中的懒惰发布/订阅,只获取最后一条消息

Posted

技术标签:

【中文标题】zeromq中的懒惰发布/订阅,只获取最后一条消息【英文标题】:Lazy pub/sub in zeromq, only get last message 【发布时间】:2014-10-15 09:46:24 【问题描述】:

我正在尝试通过示例 wuclient/wuserver 在 zeromq 上实现一个惰性订阅者。 客户端比服务器慢得多,所以它必须只获取服务器最后发送的消息。

到目前为止,我发现这样做的唯一方法是连接/断开客户端,但每次连接当然会产生不必要的成本,大约 3 毫秒:

server.cxx

int main () 
    //  Prepare our context and publisher
    zmq::context_t context (1);
    zmq::socket_t publisher (context, ZMQ_PUB);
    publisher.bind("tcp://*:5556");
    int counter = 0;
    while (1) 
        counter++;

        //  Send message to all subscribers
        zmq::message_t message(20);
        snprintf ((char *) message.data(), 20 ,
                  "%d", counter);
        publisher.send(message);
        std::cout     << counter <<  std::endl;
        usleep(100000);
      
      return 0;
    

client.cxx

int main (int argc, char *argv[])

  zmq::context_t context (1);
  zmq::socket_t subscriber (context, ZMQ_SUB);
  while(1)

    zmq::message_t update;
    int counter;

    subscriber.connect("tcp://localhost:5556"); // This call take some milliseconds
    subscriber.setsockopt(ZMQ_SUBSCRIBE, "", 0); 
    subscriber.recv(&update);
    subscriber.disconnect("tcp://localhost:5556");

    std::istringstream iss(static_cast<char*>(update.data()));
    iss >> counter;

    std::cout     << counter <<  std::endl;
    usleep(1000000);
  
  return 0;

服务器输出: 1 2 3 4 5 6 7 8 9 ...

客户端输出: 4 14 24 ...

我尝试在没有 co/deco 的情况下使用高水位标记来做到这一点,但它不起作用。 即使使用这种代码,只有在缓冲区达到至少数百条消息时才会开始丢弃帧。 :

int high_water_mark = 1;
socket.setsockopt(ZMQ_RCVHWM, &high_water_mark, sizeof(high_water_mark) );
socket.setsockopt(ZMQ_SNDHWM, &high_water_mark, sizeof(high_water_mark) );

zeromq-dev 中还有 this post 密切相关,但提供的解决方案(使用另一个线程选择最后一条消息是不可接受的,我无法通过网络传输大量消息,这不会以后使用。

【问题讨论】:

【参考方案1】:

解决方案是像这样使用ZMQ_CONFLATE(仅适用于非多部分消息):

client.cxx

#include <zmq.hpp>
#include <iostream>
#include <sstream>
#include <unistd.h>

int main (int argc, char *argv[])

  zmq::context_t context (1);

  zmq::socket_t subscriber (context, ZMQ_SUB);

  int conflate = 1;
  subscriber.setsockopt(ZMQ_CONFLATE, &conflate, sizeof(conflate) );
  subscriber.connect("tcp://localhost:5556");
  subscriber.setsockopt(ZMQ_SUBSCRIBE, "", 0); 

  while(1)

    zmq::message_t update;
    int counter;

    subscriber.recv(&update);

    std::istringstream iss(static_cast<char*>(update.data()));
    iss >> counter;

    std::cout     << counter <<  std::endl;
    usleep(1000000);

  
  return 0;

【讨论】:

以上是关于zeromq中的懒惰发布/订阅,只获取最后一条消息的主要内容,如果未能解决你的问题,请参考以下文章

MQTT系列-保留消息

如何在Paho中获取最后五条消息?

zeromq学习记录订阅发布消息封装

zeromq pub sub 上丢失的消息

zeromq怎样获取已经绑定的socket节点

ZeroMQ_04 发布订阅模式