ZeroMQ 作业分配
Posted
技术标签:
【中文标题】ZeroMQ 作业分配【英文标题】:ZeroMQ Job Distribution 【发布时间】:2015-06-15 17:02:42 【问题描述】:我有以下设置: 有一个客户端、多个工作人员和一个接收器。 工作人员通过 ZeroMQ 消息接收来自客户端的作业请求。他们处理输入,并将答案发送到另一个进程(接收器)。处理一条消息大约需要 1 毫秒,我们需要处理大约 50,000 条消息/秒 - 这意味着我们需要 50 多个工作人员来处理负载。
我尝试了一个简单的设置,其中客户端创建一个 ZeroMQ PUSH 套接字,所有工作人员都连接到该套接字(通过 PULL)。类似地,sink 会创建一个 PULL 套接字,所有工作人员通过 PUSH 套接字连接到该套接字。
IIUC,ZeroMQ 使用“循环”将消息发送给工作人员 - 每次另一个工作人员获得工作。这种设置似乎可以在大约 10 个工作人员(和适当的负载)的情况下足够有效地工作。但是,当进一步增加工作人员的数量和负载时,这会很快中断并且系统开始累积延迟。
我知道有几种模式可以解决负载平衡问题,但是它们面向多个客户端并且需要在其间使用路由器,这意味着额外的代码 + cpu 周期。问题是:
1) 在单个客户端、多个工作器、单个接收器的情况下,最好的模式是什么?
2) 是否可以在客户端和工作人员之间没有路由器的情况下通过在客户端进行路由来执行此操作?
3) 应该使用什么样的 ZeroMQ 套接字?
谢谢!
编辑: 添加代码。
客户:
void *context = zmq_ctx_new ();
// Socket to send messages on
void *sender = zmq_socket (context, ZMQ_PUSH);
zmq_bind (sender, "tcp://*:5557");
// Socket to send start of batch message on
void *sink = zmq_socket (context, ZMQ_PUSH);
zmq_connect (sink, "tcp://localhost:5558");
printf ("Press Enter when the workers are ready: ");
getchar ();
printf ("Sending tasks to workers\n");
// The first message is "0" and signals start of batch
s_send (sink, "0");
unsigned long i;
const int nmsgs = atoi(argv[1]);
const int nmsgs_sec = atoi(argv[2]);
const int buff_size = 1024; // 1KB msgs
unsigned long t, t_start;
t_start = timestamp();
for (i = 0; i < nmsgs; i++)
t = timestamp();
// Pace the sending according to nmsgs_sec
while( i * 1000000 / (t+1-t_start) > nmsgs_sec)
// busy wait
t = timestamp();
char buffer [buff_size];
// Write current timestamp in the packet beginning
sprintf (buffer, "%lu", t);
zmq_send (sender, buffer, buff_size, 0);
printf("Total time: %lu ms Planned time: %d ms\n", (timestamp() - t_start)/1000, nmsgs * 1000 / nmsgs_sec);
zmq_close (sink);
zmq_close (sender);
zmq_ctx_destroy (context);
工人:
// Socket to receive messages on
void *context = zmq_ctx_new ();
void *receiver = zmq_socket (context, ZMQ_PULL);
zmq_connect (receiver, receiver_addr);
// Socket to send messages to
void *sender = zmq_socket (context, ZMQ_PUSH);
zmq_connect (sender, sender_addr);
// Process tasks forever
const int buff_size = 1024;
char buffer[buff_size];
while (1)
zmq_recv (receiver, buffer, buff_size, 0);
s_send (sender, buffer);
zmq_close (receiver);
zmq_close (sender);
zmq_ctx_destroy (context);
水槽:
// Prepare our context and socket
void *context = zmq_ctx_new ();
void *receiver = zmq_socket (context, ZMQ_PULL);
zmq_bind (receiver, "tcp://*:5558");
// Wait for start of batch
char *string = s_recv (receiver);
free (string);
unsigned long t1;
unsigned long maxdt = 0;
unsigned long sumdt = 0;
int task_nbr;
int nmsgs = atoi(argv[1]);
printf("nmsgs = %d\n", nmsgs);
for (task_nbr = 0; task_nbr < nmsgs; task_nbr++)
char *string = s_recv (receiver);
t1 = timestamp();
unsigned long t0 = atoll(string);
free (string);
unsigned long dt = t1-t0;
maxdt = (maxdt > dt ? maxdt : dt);
sumdt += dt;
if(task_nbr % 10000 == 0)
printf("%d %lu\n", task_nbr, dt);
printf("Average time: %lu usec\tMax time: %lu usec\n", sumdt/nmsgs, maxdt);
zmq_close (receiver);
zmq_ctx_destroy (context);
【问题讨论】:
【参考方案1】:您有多种选择,具体取决于当前设置中实际错误出现的位置(从您提供的信息中无法判断)。
您绝对不需要另一个“中间”节点。
如果问题是连接量 (1->50) 是您当前设置中的问题,您可以在客户端上设置多个 PUSH 套接字,每个套接字都有一部分工作线程,并且只是在客户端内部进行负载平衡。
如果问题出在 PUSH 插座本身,您可以在“推”端使用 DEALER 插座,在“拉”端使用 ROUTER 插座。但我不认为这是问题所在。
一般来说,我希望您当前的设置是“正确的”,并且您的实现中可能存在错误。你知道错误是在哪里引入的吗? Client -> Worker 或 Worker -> Sink?或者其他地方?
【讨论】:
好吧,我的问题实际上是在客户端上进行负载平衡的正确模式(假设有一个,我可能不是第一个遇到此问题的模式)。当前实现中没有错误,但是在客户端 -> 工作人员或工作人员 -> 接收端方面存在一些低效率 - 实际上并不容易验证。如果有人知道问题可能是什么,那也算是一个答案。 正如我所说,你的模式很好(the ZMQ guide 中有多种模式可供选择。你的是this one。Another)。如果没有一些代码可以查看以及对您实际遇到的错误的一些描述,没有人可以猜测问题可能出在哪里。 太好了,所以我已经完全实现了您引用的分而治之模式,正如您在我提供的代码中看到的那样。问题是,当增加工作人员的数量时,我能够在少数工作人员中获得亚毫秒延迟,而延迟也在增加(对于 50 名工作人员,延迟时间约为 15 毫秒)。我的应用程序需要极低的延迟,所以我想知道还能做什么。以上是关于ZeroMQ 作业分配的主要内容,如果未能解决你的问题,请参考以下文章