ZeroMQ_04 发布订阅模式

Posted vczf

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ZeroMQ_04 发布订阅模式相关的知识,希望对你有一定的参考价值。

简单来说,就是服务端不断发布消息,客户端订阅了就会收到消息。

下面我们看个简单的实力:

Server:

#include <stdlib.h> 
#include <zmq.h>
#include <string.h>
#include <unistd.h>
#include <time.h> 

#define buffersize 4096
#define randof(num)  (int) ((float) (num) * random () / (RAND_MAX + 1.0))

int main(int argc, char* argv[])
{
    // [0]创建对象
    void* ctx = zmq_ctx_new();
    void* publisher = zmq_socket(ctx, ZMQ_PUB);
    // [1]绑定到5566端口
    zmq_bind(publisher, "tcp://*:5566");

     //  初始化随机数生成器
    srandom ((unsigned) time (NULL));
    while (1) {
       int zipcode, temperature, relhumidity;
        zipcode     = randof (100000);
        temperature = randof (215) - 80;
        relhumidity = randof (50) + 10;

        //  Send message to all subscribers
        char update [20];
        sprintf (update, "%05d %d %d", zipcode, temperature, relhumidity);
        printf("server send: %s
", update);
        //s_send (publisher, update);
        zmq_send (publisher, update, strlen (update), 0);
        sleep(1);
    }
    zmq_close(publisher);
    zmq_ctx_destroy(ctx);
    return 0;
}

Client:

#include <stdlib.h> 
#include <zmq.h>
#include <string.h>
#include <unistd.h>
#include <time.h> 
#include <assert.h>

static char *s_recv (void *socket) {
    char buffer [256];
    int size = zmq_recv (socket, buffer, 255, 0);
    if (size == -1)
        return NULL;
    buffer[size] = ;

    return strndup (buffer, sizeof(buffer) - 1);
}

int main (int argc, char *argv [])
{
    //  [0]创建对象,连接到5566端口
    printf ("Collecting updates from weather server...
");
    void *context = zmq_ctx_new ();
    void *subscriber = zmq_socket (context, ZMQ_SUB);
    int rc = zmq_connect (subscriber, "tcp://localhost:5566");
    assert (rc == 0);

    //  [1]设置过滤条件,设置为空,表示全订阅,这里“1”表示匹配开头为“1”的数据
    const char *filter =  "1";
    rc = zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE,
                         filter, strlen (filter));
    assert (rc == 0);
    //  [2]接受数据
    int update_nbr;
    long total_temp = 0;
    for (update_nbr = 0; update_nbr < 100; update_nbr++) {
        
        char *string = s_recv (subscriber);
        printf ("client: %s
", string);
        int zipcode, temperature, relhumidity;
        sscanf (string, "%d %d %d",
            &zipcode, &temperature, &relhumidity);
        total_temp += temperature;
        free (string);
    }
    printf ("Average temperature for zipcode ‘%s‘ was %dF
",
        filter, (int) (total_temp / update_nbr));

    zmq_close (subscriber);
    zmq_ctx_destroy (context);
    return 0;
}

 

out:

// server
server send: 43345 -41 19
server send: 44203 110 59
server send: 78038 2 25
server send: 55377 59 18
server send: 40135 -65 36
server send: 37950 43 10

// client
zf@eappsvr-0:~/ds/zmq/test/pub_sub> ./client
Collecting updates from weather server...
client....
client: 10057 67 11
client: 16839 94 25

技术图片

 

注意: 

需要注意的是,在使用SUB套接字时,必须使用zmq_setsockopt()方法来设置订阅的内容。如果你不设置订阅内容,那将什么消息都收不到,新手很容易犯这个错误。订阅信息可以是任何字符串,可以设置多次。只要消息满足其中一条订阅信息,SUB套接字就会收到。订阅者可以选择不接收某类消息,也是通过zmq_setsockopt()方法实现的。

PUB-SUB套接字组合是异步的。客户端在一个循环体中使用zmq_recv()接收消息,如果向SUB套接字发送消息则会报错;类似地,服务端可以不断地使用zmq_send()发送消息,但不能在PUB套接字上使用zmq_recv()。

关于PUB-SUB套接字,还有一点需要注意:你无法得知SUB是何时开始接收消息的。就算你先打开了SUB套接字,后打开PUB发送消息,这时SUB还是会丢失一些消息的,因为建立连接是需要一些时间的。很少,但并不是零。

以上是关于ZeroMQ_04 发布订阅模式的主要内容,如果未能解决你的问题,请参考以下文章

zeromq学习记录订阅发布消息封装

ZeroMQ发布-订阅模式(套接字类型ZMQ_PUBZMQ_SUBZMQ_XPUB等)

zeromq pub sub 上丢失的消息

RbbitMQ04_Spring整合RabbitMQ实现发布与订阅模式路由模式通配符模式

ZeroMQ PUB/SUB 绑定订阅者

ZeroMq PUB/SUB 模式无法正常工作