ZeroMQ_05 管道模式
Posted axianzZ
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ZeroMQ_05 管道模式相关的知识,希望对你有一定的参考价值。
下面一个示例程序中,我们将使用ZMQ进行超级计算,也就是并行处理模型:
- 任务分发器会生成大量可以并行计算的任务;
- 有一组worker会处理这些任务;
- 结果收集器会在末端接收所有worker的处理结果,进行汇总。
taskvent:
#include <stdlib.h> #include <zmq.h> #include <string.h> #include <unistd.h> #include <time.h> #include <assert.h> static int s_send (void *socket, char *string) { int size = zmq_send (socket, string, strlen (string), 0); return size; } #define randof(num) (int) ((float) (num) * random () / (RAND_MAX + 1.0)) int main (void) { // [0]创建对象 void *context = zmq_ctx_new (); // [1]发送消息的嵌套字 void *sender = zmq_socket (context, ZMQ_PUSH); zmq_bind (sender, "tcp://*:5557"); // [2]分发消息的嵌套字 void *sink = zmq_socket (context, ZMQ_PUSH); zmq_connect (sink, "tcp://localhost:5558"); printf ("Press Enter when the workers are ready: "); getchar (); printf ("Sending tasks to workers...\\n"); // [3]发送开始信号 s_send (sink, "0"); // [4]初始化随机数 srandom ((unsigned) time (NULL)); // [5]发送100个任务 int task_nbr; int total_msec = 0; // 预计执行时间(毫秒) for (task_nbr = 0; task_nbr < 100; task_nbr++) { int workload; // 随机产生1-100毫秒 workload = randof (100) + 1; total_msec += workload; char string [10]; sprintf (string, "%d", workload); s_send (sender, string); } printf ("Total expected cost: %d msec\\n", total_msec); zmq_close (sink); zmq_close (sender); zmq_ctx_destroy (context); return 0; }
taskwork:
#include <stdlib.h> #include <zmq.h> #include <string.h> #include <unistd.h> #include <time.h> #include <assert.h> void s_sleep (int msecs) { struct timespec t; t.tv_sec = msecs / 1000; t.tv_nsec = (msecs % 1000) * 1000000; nanosleep (&t, NULL); } int s_send (void *socket, char *string) { int size = zmq_send (socket, string, strlen (string), 0); return size; } char *s_recv (void *socket) { char buffer [256]; int size = zmq_recv (socket, buffer, 255, 0); if (size == -1) return NULL; buffer[size] = \'\\0\'; return strndup (buffer, sizeof(buffer) - 1); } int main (void) { // [0]创建对象 void *context = zmq_ctx_new (); // [1]发送消息的嵌套字 void *receiver = zmq_socket (context, ZMQ_PULL); zmq_connect (receiver, "tcp://localhost:5557"); // [2]分发消息的嵌套字 void *sender = zmq_socket (context, ZMQ_PUSH); zmq_connect (sender, "tcp://localhost:5558"); // [3]循环处理任务 while (1) { char *string = s_recv (receiver); printf ("%s.", string); // Show progress fflush (stdout); s_sleep (atoi (string)); // Do the work free (string); s_send (sender, ""); // Send results to sink } zmq_close (receiver); zmq_close (sender); zmq_ctx_destroy (context); return 0; }
tasksink:
#include <stdlib.h> #include <zmq.h> #include <string.h> #include <unistd.h> #include <time.h> #include <assert.h> #include <sys/time.h> int s_send (void *socket, char *string) { int size = zmq_send (socket, string, strlen (string), 0); return size; } char *s_recv (void *socket) { char buffer [256]; int size = zmq_recv (socket, buffer, 255, 0); if (size == -1) return NULL; buffer[size] = \'\\0\'; return strndup (buffer, sizeof(buffer) - 1); } int64_t s_clock (void) { struct timeval tv; gettimeofday (&tv, NULL); return (int64_t) (tv.tv_sec * 1000 + tv.tv_usec / 1000); } int main (void) { // [0]准备上下文和套接字 void *context = zmq_ctx_new (); void *receiver = zmq_socket (context, ZMQ_PULL); zmq_bind (receiver, "tcp://*:5558"); // [1]等待开始信号 char *string = s_recv (receiver); free (string); // [2]开始计时 int64_t start_time = s_clock (); // [3]确定任务处理 int task_nbr; for (task_nbr = 0; task_nbr < 100; task_nbr++) { char *string = s_recv (receiver); free (string); if (task_nbr % 10 == 0) printf (":"); else printf ("."); fflush (stdout); } // Calculate and report duration of batch printf ("Total elapsed time: %d msec\\n", (int) (s_clock () - start_time)); zmq_close (receiver); zmq_ctx_destroy (context); return 0; }
一组任务的平均执行时间在5秒左右,以下是分别开始1个、2个、4个worker时的执行结果:
# 1 worker Total elapsed time: 5034 msec # 2 workers Total elapsed time: 2421 msec # 4 workers Total elapsed time: 1018 msec
代码的流程图如下所示,vent发布大量的任务,work会均衡的分担处理这些任务,sink监听任务。
关于这段代码的几个细节:
-
worker上游和任务分发器相连,下游和结果收集器相连,这就意味着你可以开启任意多个worker。但若worker是绑定至端点的,而非连接至端点,那我们就需要准备更多的端点,并配置任务分发器和结果收集器。所以说,任务分发器和结果收集器是这个网络结构中较为稳定的部分,因此应该由它们绑定至端点,而非worker,因为它们较为动态。
-
我们需要做一些同步的工作,等待worker全部启动之后再分发任务。这点在ZMQ中很重要,且不易解决。连接套接字的动作会耗费一定的时间,因此当第一个worker连接成功时,它会一下收到很多任务。所以说,如果我们不进行同步,那这些任务根本就不会被并行地执行。你可以自己试验一下。
-
任务分发器使用PUSH套接字向worker均匀地分发任务(假设所有的worker都已经连接上了),这种机制称为_负载均衡_,以后我们会见得更多。
-
结果收集器的PULL套接字会均匀地从worker处收集消息,这种机制称为_公平队列_:
以上是关于ZeroMQ_05 管道模式的主要内容,如果未能解决你的问题,请参考以下文章