ZMQ之面向服务的可靠队列(管家模式)

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ZMQ之面向服务的可靠队列(管家模式)相关的知识,希望对你有一定的参考价值。


        管家模式协议(MDP)在扩展PPP协议时引入了一个有趣的特性:client发送的每一个请求都有一个“服务名称”,而worker在像队列装置注册时需要告知自己的服务类型。MDP的优势在于它来源于现实编程,协议简单,且容易提升。

        引入“服务名称”的机制,是对偏执海盗队列的一个简单补充,而结果是让其成为一个面向服务的代理。

ZMQ之面向服务的可靠队列(管家模式)_GNU

        在实施管家模式之前,我们需要为client和worker编写一个框架。如果程序员可以通过简单的API来实现这种模式,那就没有必要让他们去了解管家模式的协议内容和实现方法了。
        所以,我们第一个协议(即管家模式协议)定义了分布式架构中节点是如何互相交互的,第二个协议则要定义应用程序应该如何通过框架来使用这一协议。
        管家模式有两个端点,客户端和服务端。因为我们要为client和worker都撰写框架,所以就需要提供两套API。以下是用简单的面向对象方法设计的client端API雏形,使用的是C语言的​​ZFL library​​。

mdcli_t *mdcli_new     (char *broker);
void mdcli_destroy (mdcli_t **self_p);
zmsg_t *mdcli_send (mdcli_t *self, char *service, zmsg_t **request_p);

        就这么简单。我们创建了一个会话来和代理通信,发送并接收一个请求,最后关闭连接。以下是worker端API的雏形。

mdwrk_t *mdwrk_new     (char *broker,char *service);
void mdwrk_destroy (mdwrk_t **self_p);
zmsg_t *mdwrk_recv (mdwrk_t *self, zmsg_t *reply);

        上面两段代码看起来差不多,但是worker端API略有不同。worker第一次执行recv()后会传递一个空的应答,之后才传递当前的应答,并获得新的请求。

        两段的API都很容易开发,只需在偏执海盗模式代码的基础上修改即可。以下是client API:

        mdcliapi: Majordomo client API in C

/*  =====================================================================
mdcliapi.c

Majordomo Protocol Client API
Implements the MDP/Worker spec at http://rfc.zeromq.org/spec:7.

---------------------------------------------------------------------
Copyright (c) 1991-2011 iMatix Corporation <www.imatix.com>
Copyright other contributors as noted in the AUTHORS file.

This file is part of the ZeroMQ Guide: http://zguide.zeromq.org

This is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or (at
your option) any later version.

This software is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Lesser General Public License for more details.

You should have received a copy of the GNU Lesser General Public
License along with this program. If not, see
<http://www.gnu.org/licenses/>.
=====================================================================
*/

#include "mdcliapi.h"

// 类结构
// 我们会通过成员方法来访问这些属性

struct _mdcli_t
zctx_t *ctx; // 上下文
char *broker;
void *client; // 连接至代理的套接字
int verbose; // 使用标准输出打印当前活动
int timeout; // 请求超时时间
int retries; // 请求重试次数
;


// ---------------------------------------------------------------------
// 连接或重连代理

void s_mdcli_connect_to_broker (mdcli_t *self)

if (self->client)
zsocket_destroy (self->ctx, self->client);
self->client = zsocket_new (self->ctx, ZMQ_REQ);
zmq_connect (self->client, self->broker);
if (self->verbose)
zclock_log ("I: 正在连接至代理 %s...", self->broker);



// ---------------------------------------------------------------------
// 构造函数

mdcli_t *
mdcli_new (char *broker, int verbose)

assert (broker);

mdcli_t *self = (mdcli_t *) zmalloc (sizeof (mdcli_t));
self->ctx = zctx_new ();
self->broker = strdup (broker);
self->verbose = verbose;
self->timeout = 2500; // 毫秒
self->retries = 3; // 尝试次数

s_mdcli_connect_to_broker (self);
return self;



// ---------------------------------------------------------------------
// 析构函数

void
mdcli_destroy (mdcli_t **self_p)

assert (self_p);
if (*self_p)
mdcli_t *self = *self_p;
zctx_destroy (&self->ctx);
free (self->broker);
free (self);
*self_p = NULL;




// ---------------------------------------------------------------------
// 设定请求超时时间

void
mdcli_set_timeout (mdcli_t *self, int timeout)

assert (self);
self->timeout = timeout;



// ---------------------------------------------------------------------
// 设定请求重试次数

void
mdcli_set_retries (mdcli_t *self, int retries)

assert (self);
self->retries = retries;



// ---------------------------------------------------------------------
// 向代理发送请求,并尝试获取应答;
// 对消息保持所有权,发送后销毁;
// 返回应答消息,或NULL。

zmsg_t *
mdcli_send (mdcli_t *self, char *service, zmsg_t **request_p)

assert (self);
assert (request_p);
zmsg_t *request = *request_p;

// 用协议前缀包装消息
// Frame 1: "MDPCxy" (six bytes, MDP/Client x.y)
// Frame 2: 服务名称 (可打印字符串)
zmsg_pushstr (request, service);
zmsg_pushstr (request, MDPC_CLIENT);
if (self->verbose)
zclock_log ("I: 发送请求给 %s 服务:", service);
zmsg_dump (request);


int retries_left = self->retries;
while (retries_left && !zctx_interrupted)
zmsg_t *msg = zmsg_dup (request);
zmsg_send (&msg, self->client);

while (TRUE)
// 轮询套接字以接收应答,有超时时间
zmq_pollitem_t items [] =
self->client, 0, ZMQ_POLLIN, 0 ;
int rc = zmq_poll (items, 1, self->timeout * ZMQ_POLL_MSEC);
if (rc == -1)
break; // 中断

// 收到应答后进行处理
if (items [0].revents & ZMQ_POLLIN)
zmsg_t *msg = zmsg_recv (self->client);
if (self->verbose)
zclock_log ("I: received reply:");
zmsg_dump (msg);

// 不要尝试处理错误,直接报错即可
assert (zmsg_size (msg) >= 3);

zframe_t *header = zmsg_pop (msg);
assert (zframe_streq (header, MDPC_CLIENT));
zframe_destroy (&header);

zframe_t *reply_service = zmsg_pop (msg);
assert (zframe_streq (reply_service, service));
zframe_destroy (&reply_service);

zmsg_destroy (&request);
return msg; // 成功

else
if (--retries_left)
if (self->verbose)
zclock_log ("W: no reply, reconnecting...");
// 重连并重发消息
s_mdcli_connect_to_broker (self);
zmsg_t *msg = zmsg_dup (request);
zmsg_send (&msg, self->client);

else
if (self->verbose)
zclock_log ("W: 发生严重错误,放弃重试。");
break; // 放弃



if (zctx_interrupted)
printf ("W: 收到中断消息,结束client进程...\\n");
zmsg_destroy (&request);
return NULL;

        以下测试程序会执行10万次请求应答:

        mdclient: Majordomo client application in C

//
// 管家模式协议 - 客户端示例
// 使用mdcli API隐藏管家模式协议的内部实现
//

// 让我们直接编译这段代码,不生成类库
#include "mdcliapi.c"

int main (int argc, char *argv [])

int verbose = (argc > 1 && streq (argv [1], "-v"));
mdcli_t *session = mdcli_new ("tcp://localhost:5555", verbose);

int count;
for (count = 0; count < 100000; count++)
zmsg_t *request = zmsg_new ();
zmsg_pushstr (request, "Hello world");
zmsg_t *reply = mdcli_send (session, "echo", &request);
if (reply)
zmsg_destroy (&reply);
else
break; // 中断或停止

printf ("已处理 %d 次请求-应答\\n", count);
mdcli_destroy (&session);
return 0;

        下面是worker的API:

        mdwrkapi: Majordomo worker API in C

/*  =====================================================================
mdwrkapi.c

Majordomo Protocol Worker API
Implements the MDP/Worker spec at http://rfc.zeromq.org/spec:7.

---------------------------------------------------------------------
Copyright (c) 1991-2011 iMatix Corporation <www.imatix.com>
Copyright other contributors as noted in the AUTHORS file.

This file is part of the ZeroMQ Guide: http://zguide.zeromq.org

This is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or (at
your option) any later version.

This software is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Lesser General Public License for more details.

You should have received a copy of the GNU Lesser General Public
License along with this program. If not, see
<http://www.gnu.org/licenses/>.
=====================================================================
*/

#include "mdwrkapi.h"

// 可靠性参数
#define HEARTBEAT_LIVENESS 3 // 合理值:3-5

// 类结构
// 使用成员函数访问属性

struct _mdwrk_t
zctx_t *ctx; // 上下文
char *broker;
char *service;
void *worker; // 连接至代理的套接字
int verbose; // 使用标准输出打印活动

// 心跳设置
uint64_t heartbeat_at; // 发送心跳的时间
size_t liveness; // 尝试次数
int heartbeat; // 心跳延时,单位:毫秒
int reconnect; // 重连延时,单位:毫秒

// 内部状态
int expect_reply; // 初始值为0

// 应答地址,如果存在的话
zframe_t *reply_to;
;


// ---------------------------------------------------------------------
// 发送消息给代理
// 如果没有提供消息,则内部创建一个

static void
s_mdwrk_send_to_broker (mdwrk_t *self, char *command, char *option,
zmsg_t *msg)

msg = msg? zmsg_dup (msg): zmsg_new ();

// 将协议信封压入消息顶部
if (option)
zmsg_pushstr (msg, option);
zmsg_pushstr (msg, command);
zmsg_pushstr (msg, MDPW_WORKER);
zmsg_pushstr (msg, "");

if (self->verbose)
zclock_log ("I: sending %s to broker",
mdps_commands [(int) *command]);
zmsg_dump (msg);

zmsg_send (&msg, self->worker);



// ---------------------------------------------------------------------
// 连接或重连代理

void s_mdwrk_connect_to_broker (mdwrk_t *self)

if (self->worker)
zsocket_destroy (self->ctx, self->worker);
self->worker = zsocket_new (self->ctx, ZMQ_DEALER);
zmq_connect (self->worker, self->broker);
if (self->verbose)
zclock_log ("I: 正在连接代理 %s...", self->broker);

// 向代理注册服务类型
s_mdwrk_send_to_broker (self, MDPW_READY, self->service, NULL);

// 当心跳健康度为零,表示代理已断开连接
self->liveness = HEARTBEAT_LIVENESS;
self->heartbeat_at = zclock_time () + self->heartbeat;



// ---------------------------------------------------------------------
// 构造函数

mdwrk_t *
mdwrk_new (char *broker,char *service, int verbose)

assert (broker);
assert (service);

mdwrk_t *self = (mdwrk_t *) zmalloc (sizeof (mdwrk_t));
self->ctx = zctx_new ();
self->broker = strdup (broker);
self->service = strdup (service);
self->verbose = verbose;
self->heartbeat = 2500; // 毫秒
self->reconnect = 2500; // 毫秒

s_mdwrk_connect_to_broker (self);
return self;



// ---------------------------------------------------------------------
// 析构函数

void
mdwrk_destroy (mdwrk_t **self_p)

assert (self_p);
if (*self_p)
mdwrk_t *self = *self_p;
zctx_destroy (&self->ctx);
free (self->broker);
free (self->service);
free (self);
*self_p = NULL;




// ---------------------------------------------------------------------
// 设置心跳延迟

void
mdwrk_set_heartbeat (mdwrk_t *self, int heartbeat)

self->heartbeat = heartbeat;



// ---------------------------------------------------------------------
// 设置重连延迟

void
mdwrk_set_reconnect (mdwrk_t *self, int reconnect)

self->reconnect = reconnect;



// ---------------------------------------------------------------------
// 若有应答则发送给代理,并等待新的请求

zmsg_t *
mdwrk_recv (mdwrk_t *self, zmsg_t **reply_p)

// 格式化并发送请求传入的应答
assert (reply_p);
zmsg_t *reply = *reply_p;
assert (reply || !self->expect_reply);
if (reply)
assert (self->reply_to);
zmsg_wrap (reply, self->reply_to);
s_mdwrk_send_to_broker (self, MDPW_REPLY, NULL, reply);
zmsg_destroy (reply_p);

self->expect_reply = 1;

while (TRUE)
zmq_pollitem_t items [] =
self->worker, 0, ZMQ_POLLIN, 0 ;
int rc = zmq_poll (items, 1, self->heartbeat * ZMQ_POLL_MSEC);
if (rc == -1)
break; // 中断

if (items [0].revents & ZMQ_POLLIN)
zmsg_t *msg = zmsg_recv (self->worker);
if (!msg)
break; // 中断
if (self->verbose)
zclock_log ("I: 从代理处获得消息:");
zmsg_dump (msg);

self->liveness = HEARTBEAT_LIVENESS;

// 不要处理错误,直接报错即可
assert (zmsg_size (msg) >= 3);

zframe_t *empty = zmsg_pop (msg);
assert (zframe_streq (empty, ""));
zframe_destroy (&empty);

zframe_t *header = zmsg_pop (msg);
assert (zframe_streq (header, MDPW_WORKER));
zframe_destroy (&header);

zframe_t *command = zmsg_pop (msg);
if (zframe_streq (command, MDPW_REQUEST))
// 这里需要将消息中空帧之前的所有地址都保存起来,
// 但在这里我们暂时只保存一个
self->reply_to = zmsg_unwrap (msg);
zframe_destroy (&command);
return msg; // 处理请求

else
if (zframe_streq (command, MDPW_HEARTBEAT))
; // 不对心跳做任何处理
else
if (zframe_streq (command, MDPW_DISCONNECT))
s_mdwrk_connect_to_broker (self);
else
zclock_log ("E: 消息不合法");
zmsg_dump (msg);

zframe_destroy (&command);
zmsg_destroy (&msg);

else
if (--self->liveness == 0)
if (self->verbose)
zclock_log ("W: 失去与代理的连接 - 正在重试...");
zclock_sleep (self->reconnect);
s_mdwrk_connect_to_broker (self);

// 适时地发送消息
if (zclock_time () > self->heartbeat_at)
s_mdwrk_send_to_broker (self, MDPW_HEARTBEAT, NULL, NULL);
self->heartbeat_at = zclock_time () + self->heartbeat;


if (zctx_interrupted)
printf ("W: 收到中断消息,中止worker...\\n");
return NULL;

        以下测试程序实现了名为echo的服务:

        mdworker: Majordomo worker application in C

//
// 管家模式协议 - worker示例
// 使用mdwrk API隐藏MDP协议的内部实现
//

// 让我们直接编译代码,而不创建类库
#include "mdwrkapi.c"

int main (int argc, char *argv [])

int verbose = (argc > 1 && streq (argv [1], "-v"));
mdwrk_t *session = mdwrk_new (
"tcp://localhost:5555", "echo", verbose);

zmsg_t *reply = NULL;
while (1)
zmsg_t *request = mdwrk_recv (session, &reply);
if (request == NULL)
break; // worker被中止
reply = request; // echo服务……其实很复杂:)

mdwrk_destroy (&session);
return 0;

        几点说明:

                1、API是单线程的,所以说worker不会再后台发送心跳,而这也是我们所期望的:如果worker应用程序停止了,心跳就会跟着中止,代理便会停止向该worker发送新的请求。

                2、wroker API没有做回退算法的设置,因为这里不值得使用这一复杂的机制。

                3、API没有提供任何报错机制,如果出现问题,它会直接报断言(或异常,依语言而定)。这一做法对实验性的编程是有用的,这样可以立刻看到执行结果。但在真实编程环境中,API应该足够健壮,合适地处理非法消息。

        也许你会问,worker API为什么要关闭它的套接字并新开一个呢?特别是ZMQ是有重连机制的,能够在节点归来后进行重连。我们可以回顾一下简单海盗模式中的worker,以及偏执海盗模式中的worker来加以理解。ZMQ确实会进行自动重连,但如果代理死亡并重连,worker并不会重新进行注册。这个问题有两种解决方案:一是我们这里用到的较为简便的方案,即当worker判断代理已经死亡时,关闭它的套接字并重头来过;另一个方案是当代理收到未知worker的心跳时要求该worker对其提供的服务类型进行注册,这样一来就需要在协议中说明这一规则。

        下面让我们设计管家模式的代理,它的核心代码是一组队列,每种服务对应一个队列。我们会在worker出现时创建相应的队列(worker消失时应该销毁对应的队列,不过我们这里暂时不考虑)。额外的,我们会为每种服务维护一个worker的队列。

        为了让C语言代码更为易读易写,我使用了​​ZFL项目​​提供的哈希和链表容器,并命名为​​zhash​​和​​zlist​​。如果使用现代语言编写,那自然可以使用其内置的容器。

//
// 管家模式协议 - 代理
// 协议 http://rfc.zeromq.org/spec:7 和 spec:8 的最简实现
//
#include "czmq.h"
#include "mdp.h"

// 一般我们会从配置文件中获取以下值

#define HEARTBEAT_LIVENESS 3 // 合理值:3-5
#define HEARTBEAT_INTERVAL 2500 // 单位:毫秒
#define HEARTBEAT_EXPIRY HEARTBEAT_INTERVAL * HEARTBEAT_LIVENESS

// 定义一个代理
typedef struct
zctx_t *ctx; // 上下文
void *socket; // 用于连接client和worker的套接字
int verbose; // 使用标准输出打印活动信息
char *endpoint; // 代理绑定到的端点
zhash_t *services; // 已知服务的哈希表
zhash_t *workers; // 已知worker的哈希表
zlist_t *waiting; // 正在等待的worker队列
uint64_t heartbeat_at; // 发送心跳的时间
broker_t;

// 定义一个服务
typedef struct
char *name; // 服务名称
zlist_t *requests; // 客户端请求队列
zlist_t *waiting; // 正在等待的worker队列
size_t workers; // 可用worker数
service_t;

// 定义一个worker,状态为空闲或占用
typedef struct
char *identity; // worker的标识
zframe_t *address; // 地址帧
service_t *service; // 所属服务
int64_t expiry; // 过期时间,从未收到心跳起计时
worker_t;


// ---------------------------------------------------------------------
// 代理使用的函数
static broker_t *
s_broker_new (int verbose);
static void
s_broker_destroy (broker_t **self_p);
static void
s_broker_bind (broker_t *self, char *endpoint);
static void
s_broker_purge_workers (broker_t *self);

// 服务使用的函数
static service_t *
s_service_require (broker_t *self, zframe_t *service_frame);
static void
s_service_destroy (void *argument);
static void
s_service_dispatch (broker_t *self, service_t *service, zmsg_t *msg);
static void
s_service_internal (broker_t *self, zframe_t *service_frame, zmsg_t *msg);

// worker使用的函数
static worker_t *
s_worker_require (broker_t *self, zframe_t *address);
static void
s_worker_delete (broker_t *self, worker_t *worker, int disconnect);
static void
s_worker_destroy (void *argument);
static void
s_worker_process (broker_t *self, zframe_t *sender, zmsg_t *msg);
static void
s_worker_send (broker_t *self, worker_t *worker, char *command,
char *option, zmsg_t *msg);
static void
s_worker_waiting (broker_t *self, worker_t *worker);

// 客户端使用的函数
static void
s_client_process (broker_t *self, zframe_t *sender, zmsg_t *msg);


// ---------------------------------------------------------------------
// 主程序

int main (int argc, char *argv [])

int verbose = (argc > 1 && streq (argv [1], "-v"));

broker_t *self = s_broker_new (verbose);
s_broker_bind (self, "tcp://*:5555");

// 接受并处理消息,直至程序被中止
while (TRUE)
zmq_pollitem_t items [] =
self->socket, 0, ZMQ_POLLIN, 0 ;
int rc = zmq_poll (items, 1, HEARTBEAT_INTERVAL * ZMQ_POLL_MSEC);
if (rc == -1)
break; // 中断

// Process next input message, if any
if (items [0].revents & ZMQ_POLLIN)
zmsg_t *msg = zmsg_recv (self->socket);
if (!msg)
break; // 中断
if (self->verbose)
zclock_log ("I: 收到消息:");
zmsg_dump (msg);

zframe_t *sender = zmsg_pop (msg);
zframe_t *empty = zmsg_pop (msg);
zframe_t *header = zmsg_pop (msg);

if (zframe_streq (header, MDPC_CLIENT))
s_client_process (self, sender, msg);
else
if (zframe_streq (header, MDPW_WORKER))
s_worker_process (self, sender, msg);
else
zclock_log ("E: 非法消息:");
zmsg_dump (msg);
zmsg_destroy (&msg);

zframe_destroy (&sender);
zframe_destroy (&empty);
zframe_destroy (&header);

// 断开并删除过期的worker
// 适时地发送心跳给worker
if (zclock_time () > self->heartbeat_at)
s_broker_purge_workers (self);
worker_t *worker = (worker_t *) zlist_first (self->waiting);
while (worker)
s_worker_send (self, worker, MDPW_HEARTBEAT, NULL, NULL);
worker = (worker_t *) zlist_next (self->waiting);

self->heartbeat_at = zclock_time () + HEARTBEAT_INTERVAL;


if (zctx_interrupted)
printf ("W: 收到中断消息,关闭中...\\n");

s_broker_destroy (&self);
return 0;



// ---------------------------------------------------------------------
// 代理对象的构造函数

static broker_t *
s_broker_new (int verbose)

broker_t *self = (broker_t *) zmalloc (sizeof (broker_t));

// 初始化代理状态
self->ctx = zctx_new ();
self->socket = zsocket_new (self->ctx, ZMQ_ROUTER);
self->verbose = verbose;
self->services = zhash_new ();
self->workers = zhash_new ();
self->waiting = zlist_new ();
self->heartbeat_at = zclock_time () + HEARTBEAT_INTERVAL;
return self;


// ---------------------------------------------------------------------
// 代理对象的析构函数

static void
s_broker_destroy (broker_t **self_p)

assert (self_p);
if (*self_p)
broker_t *self = *self_p;
zctx_destroy (&self->ctx);
zhash_destroy (&self->services);
zhash_destroy (&self->workers);
zlist_destroy (&self->waiting);
free (self);
*self_p = NULL;



// ---------------------------------------------------------------------
// 将代理套接字绑定至端点,可以重复调用该函数
// 我们使用一个套接字来同时处理client和worker

void
s_broker_bind (broker_t *self, char *endpoint)

zsocket_bind (self->socket, endpoint);
zclock_log ("I: MDP broker/0.1.1 is active at %s", endpoint);


// ---------------------------------------------------------------------
// 删除空闲状态中过期的worker

static void
s_broker_purge_workers (broker_t *self)

worker_t *worker = (worker_t *) zlist_first (self->waiting);
while (worker)
if (zclock_time () < worker->expiry)
continue; // 该worker未过期,停止搜索
if (self->verbose)
zclock_log ("I: 正在删除过期的worker: %s",
worker->identity);

s_worker_delete (self, worker, 0);
worker = (worker_t *) zlist_first (self->waiting);



// ---------------------------------------------------------------------
// 定位或创建新的服务项

static service_t *
s_service_require (broker_t *self, zframe_t *service_frame)

assert (service_frame);
char *name = zframe_strdup (service_frame);

service_t *service =
(service_t *) zhash_lookup (self->services, name);
if (service == NULL)
service = (service_t *) zmalloc (sizeof (service_t));
service->name = name;
service->requests = zlist_new ();
service->waiting = zlist_new ();
zhash_insert (self->services, name, service);
zhash_freefn (self->services, name, s_service_destroy);
if (self->verbose)
zclock_log ("I: 收到消息:");

else
free (name);

return service;


// ---------------------------------------------------------------------
// 当服务从broker->services中移除时销毁该服务对象

static void
s_service_destroy (void *argument)

service_t *service = (service_t *) argument;
// 销毁请求队列中的所有项目
while (zlist_size (service->requests))
zmsg_t *msg = zlist_pop (service->requests);
zmsg_destroy (&msg);

zlist_destroy (&service->requests);
zlist_destroy (&service->waiting);
free (service->name);
free (service);


// ---------------------------------------------------------------------
// 可能时,分发请求给等待中的worker

static void
s_service_dispatch (broker_t *self, service_t *service, zmsg_t *msg)

assert (service);
if (msg) // 将消息加入队列
zlist_append (service->requests, msg);

s_broker_purge_workers (self);
while (zlist_size (service->waiting)
&& zlist_size (service->requests))

worker_t *worker = zlist_pop (service->waiting);
zlist_remove (self->waiting, worker);
zmsg_t *msg = zlist_pop (service->requests);
s_worker_send (self, worker, MDPW_REQUEST, NULL, msg);
zmsg_destroy (&msg);



// ---------------------------------------------------------------------
// 使用8/MMI协定处理内部服务

static void
s_service_internal (broker_t *self, zframe_t *service_frame, zmsg_t *msg)

char *return_code;
if (zframe_streq (service_frame, "mmi.service"))
char *name = zframe_strdup (zmsg_last (msg));
service_t *service =
(service_t *) zhash_lookup (self->services, name);
return_code = service && service->workers? "200": "404";
free (name);

else
return_code = "501";

zframe_reset (zmsg_last (msg), return_code, strlen (return_code));

// 移除并保存返回给client的信封,插入协议头信息和服务名称,并重新包装信封
zframe_t *client = zmsg_unwrap (msg);
zmsg_push (msg, zframe_dup (service_frame));
zmsg_pushstr (msg, MDPC_CLIENT);
zmsg_wrap (msg, client);
zmsg_send (&msg, self->socket);


// ---------------------------------------------------------------------
// 按需创建worker

static worker_t *
s_worker_require (broker_t *self, zframe_t *address)

assert (address);

// self->workers使用wroker的标识为键
char *identity = zframe_strhex (address);
worker_t *worker =
(worker_t *) zhash_lookup (self->workers, identity);

if (worker == NULL)
worker = (worker_t *) zmalloc (sizeof (worker_t));
worker->identity = identity;
worker->address = zframe_dup (address);
zhash_insert (self->workers, identity, worker);
zhash_freefn (self->workers, identity, s_worker_destroy);
if (self->verbose)
zclock_log ("I: 正在注册新的worker: %s", identity);

else
free (identity);
return worker;


// ---------------------------------------------------------------------
// 从所有数据结构中删除wroker,并销毁worker对象

static void
s_worker_delete (broker_t *self, worker_t *worker, int disconnect)

assert (worker);
if (disconnect)
s_worker_send (self, worker, MDPW_DISCONNECT, NULL, NULL);

if (worker->service)
zlist_remove (worker->service->waiting, worker);
worker->service->workers--;

zlist_remove (self->waiting, worker);
// 以下方法间接调用了s_worker_destroy()方法
zhash_delete (self->workers, worker->identity);


// ---------------------------------------------------------------------
// 当worker从broker->workers中移除时,销毁worker对象

static void
s_worker_destroy (void *argument)

worker_t *worker = (worker_t *) argument;
zframe_destroy (&worker->address);
free (worker->identity);
free (worker);


// ---------------------------------------------------------------------
// 处理worker发送来的消息

static void
s_worker_process (broker_t *self, zframe_t *sender, zmsg_t *msg)

assert (zmsg_size (msg) >= 1); // 消息中至少包含命令帧

zframe_t *command = zmsg_pop (msg);
char *identity = zframe_strhex (sender);
int worker_ready = (zhash_lookup (self->workers, identity) != NULL);
free (identity);
worker_t *worker = s_worker_require (self, sender);

if (zframe_streq (command, MDPW_READY))
// 若worker队列中已有该worker,但仍收到了它的“已就绪”消息,则删除这个worker。
if (worker_ready)
s_worker_delete (self, worker, 1);
else
if (zframe_size (sender) >= 4 // 服务名称为保留的服务
&& memcmp (zframe_data (sender), "mmi.", 4) == 0)
s_worker_delete (self, worker, 1);
else
// 将worker对应到服务,并置为空闲状态
zframe_t *service_frame = zmsg_pop (msg);
worker->service = s_service_require (self, service_frame);
worker->service->workers++;
s_worker_waiting (self, worker);
zframe_destroy (&service_frame);


else
if (zframe_streq (command, MDPW_REPLY))
if (worker_ready)
// 移除并保存返回给client的信封,插入协议头信息和服务名称,并重新包装信封
zframe_t *client = zmsg_unwrap (msg);
zmsg_pushstr (msg, worker->service->name);
zmsg_pushstr (msg, MDPC_CLIENT);
zmsg_wrap (msg, client);
zmsg_send (&msg, self->socket);
s_worker_waiting (self, worker);

else
s_worker_delete (self, worker, 1);

else
if (zframe_streq (command, MDPW_HEARTBEAT))
if (worker_ready)
worker->expiry = zclock_time () + HEARTBEAT_EXPIRY;
else
s_worker_delete (self, worker, 1);

else
if (zframe_streq (command, MDPW_DISCONNECT))
s_worker_delete (self, worker, 0);
else
zclock_log ("E: 非法消息");
zmsg_dump (msg);

free (command);
zmsg_destroy (&msg);


// ---------------------------------------------------------------------
// 发送消息给worker
// 如果指针指向了一条消息,发送它,但不销毁它,因为这是调用者的工作

static void
s_worker_send (broker_t *self, worker_t *worker, char *command,
char *option, zmsg_t *msg)

msg = msg? zmsg_dup (msg): zmsg_new ();

// 将协议信封压入消息顶部
if (option)
zmsg_pushstr (msg, option);
zmsg_pushstr (msg, command);
zmsg_pushstr (msg, MDPW_WORKER);

// 在消息顶部插入路由帧
zmsg_wrap (msg, zframe_dup (worker->address));

if (self->verbose)
zclock_log ("I: 正在发送消息给worker %s",
mdps_commands [(int) *command]);
zmsg_dump (msg);

zmsg_send (&msg, self->socket);


// ---------------------------------------------------------------------
// 正在等待的worker

static void
s_worker_waiting (broker_t *self, worker_t *worker)

// 将worker加入代理和服务的等待队列
zlist_append (self->waiting, worker);
zlist_append (worker->service->waiting, worker);
worker->expiry = zclock_time () + HEARTBEAT_EXPIRY;
s_service_dispatch (self, worker->service, NULL);


// ---------------------------------------------------------------------
// 处理client发来的请求

static void
s_client_process (broker_t *self, zframe_t *sender, zmsg_t *msg)

assert (zmsg_size (msg) >= 2); // 服务名称 + 请求内容

zframe_t *service_frame = zmsg_pop (msg);
service_t *service = s_service_require (self, service_frame);

// 为应答内容设置请求方的地址
zmsg_wrap (msg, zframe_dup (sender));
if (zframe_size (service_frame) >= 4
&& memcmp (zframe_data (service_frame), "mmi.", 4) == 0)
s_service_internal (self, service_frame, msg);
else
s_service_dispatch (self, service, msg);
zframe_destroy (&service_frame);

        这个例子应该是我们见过最复杂的一个示例了,大约有500行代码。编写这段代码并让其变的健壮,大约花费了两天的时间。但是,这也仅仅是一个完整的面向服务代理的一部分。

        几点说明:

                1、管家模式协议要求我们在一个套接字中同时处理client和worker,这一点对部署和管理代理很有益处:它只会在一个ZMQ端点上收发请求,而不是两个。

                2、代理很好地实现了MDP/0.1协议中规范的内容,包括当代理发送非法命令和心跳时断开的机制。

                3、可以将这段代码扩充为多线程,每个线程管理一个套接字、一组client和worker。这种做法在大型架构的拆分中显得很有趣。C语言代码已经是这样的格式了,因此很容易实现。

                4、还可以将这段代码扩充为主备模式、双在线模式,进一步提高可靠性。因为从本质上来说,代理是无状态的,只是保存了服务的存在与否,因此client和worker可以自行选择除此之外的代理来进行通信。

                5、示例代码中心跳的间隔为5秒,主要是为了减少调试时的输出。现实中的值应该设得低一些,但是,重试的过程应该设置得稍长一些,让服务有足够的时间启动,如10秒钟。

以上是关于ZMQ之面向服务的可靠队列(管家模式)的主要内容,如果未能解决你的问题,请参考以下文章

ZMQ之克隆模式的可靠性

ZMQ请求应答模式之无中间件的可靠性--自由者模式

ZMQ请求应答模式之无中间件的可靠性--自由者模式

ZMQ中请求-应答模式的可靠性设计

ZMQ之高可靠对称节点--双子星模式

ZeroMQ使用汇总