ZMQ/ZeroMQ的三种消息模式

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ZMQ/ZeroMQ的三种消息模式相关的知识,希望对你有一定的参考价值。


一、 Reuqest-Reply(请求-应答模式)

        1、使用Request-Reply模式,需要遵循一定的规律。

        2、客户端必要先发送消息,在接收消息;服务端必须先进行接收客户端发送过来的消息,在发送应答给客户端,如此循环

        3、服务端和客户端谁先启动,效果都是一样的。

        4、服务端在收到消息之前,会一直阻塞,等待客户端连上来。

ZMQ/ZeroMQ的三种消息模式_网络协议

        创建一个客户端和服务端,客户端发送消息给服务端,服务端返回消息给客户端,客户端和服务器谁先启动都可以。

        server.cpp

#include <zmq.hpp>
#include <string>
#include <iostream>
#ifndef _WIN32
#include <unistd.h>
#else
#include <windows.h>

#define sleep(n) Sleep(n)
#endif

int main ()
// Prepare our context and socket
zmq::context_t context (2);
zmq::socket_t socket (context, zmq::socket_type::rep);
socket.bind ("tcp://*:5555");

while (true)
zmq::message_t request;

// Wait for next request from client
socket.recv (request, zmq::recv_flags::none);
std::cout << "Received Hello" << std::endl;

// Do some work
sleep(1);

// Send reply back to client
zmq::message_t reply (5);
memcpy (reply.data (), "World", 5);
socket.send (reply, zmq::send_flags::none);

return 0;

        client.cpp

#include <zmq.hpp>
#include <string>
#include <iostream>

int main ()

// Prepare our context and socket
zmq::context_t context (1);
zmq::socket_t socket (context, zmq::socket_type::req);

std::cout << "Connecting to hello world server..." << std::endl;
socket.connect ("tcp://localhost:5555");

// Do 10 requests, waiting each time for a response
for (int request_nbr = 0; request_nbr != 10; request_nbr++)
zmq::message_t request (5);
memcpy (request.data (), "Hello", 5);
std::cout << "Sending Hello " << request_nbr << "..." << std::endl;
socket.send (request, zmq::send_flags::none);

// Get the reply.
zmq::message_t reply;
socket.recv (reply, zmq::recv_flags::none);
std::cout << "Received World " << request_nbr << std::endl;

return 0;

二、Publisher-Subscriber(发布-订阅模式)

        Publisher-Subscriber模式,消息是单向流动的,发布者只能发布消息,不能接受消息;订阅者只能接受消息,不能发送消息。

        服务端发布消息的过程中,如果有订阅者退出,不影响发布者继续发布消息,当订阅者再次连接上来,收到的消息是后来发布的消息

        比较晚加入的订阅者,或者中途离开的订阅者,必然会丢掉一部分信息

        如果发布者停止,所有的订阅者会阻塞,等发布者再次上线的时候回继续接受消息。

        "慢连接": 我们不知道订阅者是何时开始接受消息的,就算启动"订阅者",在启动"发布者", "订阅者"还是会缺失一部分的消息,因为建立连接是需要时间的,虽然时间很短,但不是零。ZMQ在后台是进行异步的IO传输,在建立TCP连接的短短的时间段内,ZMQ就可以发送很多消息了。

       

ZMQ/ZeroMQ的三种消息模式_网络协议_02

        publisher.cpp

#include <zmq.hpp>
#include <stdio.h>
#include <stdlib.h>
#include <time.h>

#if (defined (WIN32))
#include <zhelpers.hpp>
#endif

#define within(num) (int) ((float) num * random () / (RAND_MAX + 1.0))

int main ()

// Prepare our context and publisher
zmq::context_t context (1);
zmq::socket_t publisher (context, zmq::socket_type::pub);
publisher.bind("tcp://*:5556");
publisher.bind("ipc://weather.ipc"); // Not usable on Windows.

// Initialize random number generator
srandom ((unsigned) time (NULL));
while (1)

int zipcode, temperature, relhumidity;

// Get values that will fool the boss
zipcode = within (100000);
temperature = within (215) - 80;
relhumidity = within (50) + 10;

// Send message to all subscribers
zmq::message_t message(20);
snprintf ((char *) message.data(), 20 ,
"%05d %d %d", zipcode, temperature, relhumidity);
publisher.send(message, zmq::send_flags::none);


return 0;

         subscriber.cpp

#include <zmq.hpp>
#include <iostream>
#include <sstream>

int main (int argc, char *argv[])

zmq::context_t context (1);

// Socket to talk to server
std::cout << "Collecting updates from weather server...\\n" << std::endl;
zmq::socket_t subscriber (context, zmq::socket_type::sub);
subscriber.connect("tcp://localhost:5556");

// Subscribe to zipcode, default is NYC, 10001
const char *filter = (argc > 1)? argv [1]: "10001 ";
subscriber.setsockopt(ZMQ_SUBSCRIBE, filter, strlen (filter));

// Process 100 updates
int update_nbr;
long total_temp = 0;
for (update_nbr = 0; update_nbr < 100; update_nbr++)

zmq::message_t update;
int zipcode, temperature, relhumidity;

subscriber.recv(update, zmq::recv_flags::none);

std::istringstream iss(static_cast<char*>(update.data()));
iss >> zipcode >> temperature >> relhumidity ;

total_temp += temperature;

std::cout << "Average temperature for zipcode "<< filter
<<" was "<<(int) (total_temp / update_nbr) <<"F"
<< std::endl;
return 0;

 三、Push-Pull(平行管道模式/分布式处理)

        1、Ventilator:任务发布器会生成大量可以并行运算的任务。

        2、Worker:有一组worker会处理这些任务。

        3、Sink:结果接收器会在末端接收所有的Worker的处理结果,进行汇总。

        4、Worker上游和"任务发布器"相连,下游和"结果接收器"相连。

        5、"任务发布器" 和 "结果接收器"是这个网路结构中比较稳定的部分,由他们绑定至端点。

        6、Worker只是连接两个端点。

        7、需要等Worker全部启动后,在进行任务分发。Socket的连接会消耗一定时间(慢连接), 如果不尽兴同步的话,第一个Worker启动。

        8、会一下子接收很多任务。

        9、"任务分发器" 会向Worker均匀的分发任务(负载均衡机制)。

        10、"结果接收器" 会均匀地从Worker处收集消息(公平队列机制)。

ZMQ/ZeroMQ的三种消息模式_网络协议_03

         taskvent.cpp

#include <zmq.hpp>
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <iostream>

#define within(num) (int) ((float) num * random () / (RAND_MAX + 1.0))

int main (int argc, char *argv[])

zmq::context_t context (1);

// Socket to send messages on
zmq::socket_t sender(context, ZMQ_PUSH);
sender.bind("tcp://*:5557");

std::cout << "Press Enter when the workers are ready: " << std::endl;
getchar ();
std::cout << "Sending tasks to workers...\\n" << std::endl;

// The first message is "0" and signals start of batch
zmq::socket_t sink(context, ZMQ_PUSH);
sink.connect("tcp://localhost:5558");
zmq::message_t message(2);
memcpy(message.data(), "0", 1);
sink.send(message);

// Initialize random number generator
srandom ((unsigned) time (NULL));

// Send 100 tasks
int task_nbr;
int total_msec = 0; // Total expected cost in msecs
for (task_nbr = 0; task_nbr < 100; task_nbr++)
int workload;
// Random workload from 1 to 100msecs
workload = within (100) + 1;
total_msec += workload;

message.rebuild(10);
memset(message.data(), \\0, 10);
sprintf ((char *) message.data(), "%d", workload);
sender.send(message);

std::cout << "Total expected cost: " << total_msec << " msec" << std::endl;
sleep (1); // Give 0MQ time to deliver

return 0;

         taskwork.cpp

#include "zhelpers.hpp"
#include <string>

int main (int argc, char *argv[])

zmq::context_t context(1);

// Socket to receive messages on
zmq::socket_t receiver(context, ZMQ_PULL);
receiver.connect("tcp://localhost:5557");

// Socket to send messages to
zmq::socket_t sender(context, ZMQ_PUSH);
sender.connect("tcp://localhost:5558");

// Process tasks forever
while (1)

zmq::message_t message;
int workload; // Workload in msecs

receiver.recv(&message);
std::string smessage(static_cast<char*>(message.data()), message.size());

std::istringstream iss(smessage);
iss >> workload;

// Do the work
s_sleep(workload);

// Send results to sink
message.rebuild();
sender.send(message);

// Simple progress indicator for the viewer
std::cout << "." << std::flush;

return 0;

        tasksink.cpp

#include <zmq.hpp>
#include <time.h>
#include <sys/time.h>
#include <iostream>

int main (int argc, char *argv[])

// Prepare our context and socket
zmq::context_t context(1);
zmq::socket_t receiver(context,ZMQ_PULL);
receiver.bind("tcp://*:5558");

// Wait for start of batch
zmq::message_t message;
receiver.recv(&message);

// Start our clock now
struct timeval tstart;
gettimeofday (&tstart, NULL);

// Process 100 confirmations
int task_nbr;
int total_msec = 0; // Total calculated cost in msecs
for (task_nbr = 0; task_nbr < 100; task_nbr++)

receiver.recv(&message);
if (task_nbr % 10 == 0)
std::cout << ":" << std::flush;
else
std::cout << "." << std::flush;

// Calculate and report duration of batch
struct timeval tend, tdiff;
gettimeofday (&tend, NULL);

if (tend.tv_usec < tstart.tv_usec)
tdiff.tv_sec = tend.tv_sec - tstart.tv_sec - 1;
tdiff.tv_usec = 1000000 + tend.tv_usec - tstart.tv_usec;

else
tdiff.tv_sec = tend.tv_sec - tstart.tv_sec;
tdiff.tv_usec = tend.tv_usec - tstart.tv_usec;

total_msec = tdiff.tv_sec * 1000 + tdiff.tv_usec / 1000;
std::cout << "\\nTotal elapsed time: " << total_msec << " msec\\n" << std::endl;
return 0;

以上是关于ZMQ/ZeroMQ的三种消息模式的主要内容,如果未能解决你的问题,请参考以下文章

计算机网络的三种通讯模式(单播,广播,组播)

请求合并的三种方式,大大提高接口性能!

kafka传递消息的三种方式

vlan 的三种接口

redis的三种集群方式

redis的三种集群方式