ZMQ之克隆模式的可靠性
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ZMQ之克隆模式的可靠性相关的知识,希望对你有一定的参考价值。
克隆服务器的可靠性
克隆模型1至5相对比较简单,下面我们会探讨一个非常复杂的模型。可以发现,为了构建可靠的消息队列,我们需要花费非常多的精力。所以我们经常会问:有必要这么做吗?如果说你能够接受可靠性不够高的、或者说已经足够好的架构,那恭喜你,你在成本和收益之间找到了平衡。虽然我们会偶尔丢失一些消息,但从经济的角度来说还是合理的。不管怎样,下面我们就来介绍这个复杂的模型。
在模型3中,你会关闭和重启服务,这会导致数据的丢失。任何后续加入的客户端只能得到重启之后的那些数据,而非所有的。下面就让我们想办法让克隆模式能够承担服务器重启的故障。
以下列举我们需要处理的问题:
1、克隆服务器进程崩溃并自动或手工重启。进程丢失了所有数据,所以必须从别处进行恢复。
2、克隆服务器硬件故障,长时间不能恢复。客户端需要切换至另一个可用的服务端。
3、克隆服务器从网络上断开,如交换机发生故障等。它会在某个时点重连,但期间的数据就需要替代的服务器负责处理。
第一步我们需要增加一个服务器。我们可以使用第四章中提到的双子星模式,它是一个反应堆,而我们的程序经过整理后也是一个反应堆,因此可以互相协作。
我们需要保证更新事件在主服务器崩溃时仍能保留,最简单的机制就是同时发送给两台服务器。
备机就可以当做一台客户端来运行,像其他客户端一样从主机获取更新事件。同时它又能从客户端获取更新事件——虽然不应该以此更新数据,但可以先暂存起来。
所以,相较于模型5,模型6中引入了以下特性:
1、客户端发送更新事件改用PUB-SUB套接字,而非PUSH-PULL。原因是PUSH套接字会在没有接收方时阻塞,且会进行负载均衡——我们需要两台服务器都接收到消息。我们会在服务器端绑定SUB套接字,在客户端连接PUB套接字。
2、我们在服务器发送给客户端的更新事件中加入心跳,这样客户端可以知道主机是否已死,然后切换至备机。
3、我们使用双子星模式的bstar反应堆类来创建主机和备机。双子星模式中需要有一个“投票”套接字,来协助判定对方节点是否已死。这里我们使用快照请求来作为“投票”。
4、我们将为所有的更新事件添加UUID属性,它由客户端生成,服务端会将其发布给所有客户端。
5、备机将维护一个“待处理列表”,保存来自客户端、尚未由服务端发布的更新事件;或者反过来,来自服务端、尚未从客户端收到的更新事件。这个列表从旧到新排列,这样就能方便地从顶部删除消息。
我们可以为客户端设计一个有限状态机,它有三种状态:
1、客户端打开并连接了套接字,然后向服务端发送快照请求。为了避免消息风暴,它只会请求两次。
2、客户端等待快照应答,如果获得了则保存它;如果没有获得,则向第二个服务器发送请求。 3、客户端收到快照,便开始等待更新事件。如果在一定时间内没有收到服务端响应,则会连接第二个服务端。
客户端会一直循环下去,可能在程序刚启动时,部分客户端会试图连接主机,部分连接备机,相信双子星模式会很好地处理这一情况的。
我们可以将客户端状态图绘制出来:
故障恢复的步骤如下:
1、客户端检测到主机不再发送心跳,因此转而连接备机,并请求一份新的快照;
2、备机开始接收快照请求,并检测到主机死亡,于是开始作为主机运行;
3、备机将待处理列表中的更新事件写入自身状态中,然后开始处理快照请求。
当主机恢复连接时:
1、启动为slave状态,并作为克隆模式客户端连接备机;
2、同时,使用SUB套接字从客户端接收更新事件。
我们做两点假设:
1、至少有一台主机会继续运行。如果两台主机都崩溃了,那我们将丢失所有的服务端数据,无法恢复。 2、不同的客户端不会同时更新同一个键值对。客户端的更新事件会先后到达两个服务器,因此更新的顺序可能会不一致。单个客户端的更新事件到达两台服务器的顺序是相同的,所以不用担心。
下面是整体架构图:
开始编程之前,我们需要将客户端重构成一个可复用的类。在ZMQ中写异步类有时是为了练习如何写出优雅的代码,但这里我们确实是希望克隆模式可以成为一种易于使用的程序。上述架构的伸缩性来源于客户端的正确行为,因此有必要将其封装成一份API。要在客户端中进行故障恢复还是比较复杂的,试想一下自由者模式和克隆模式结合起来会是什么样的吧。
按照我的习惯,我会先写出一份API的列表,然后加以实现。让我们假想一个名为clone的API,在其基础之上编写克隆模式客户端API。将代码封装为API显然会提升代码的稳定性,就以模型5为例,客户端需要打开三个套接字,端点名称直接写在了代码里。我们可以创建这样一组API:
// 为每个套接字指定端点
clone_subscribe (clone, "tcp://localhost:5556");
clone_snapshot (clone, "tcp://localhost:5557");
clone_updates (clone, "tcp://localhost:5558");
// 由于有两个服务端,因此再执行一次
clone_subscribe (clone, "tcp://localhost:5566");
clone_snapshot (clone, "tcp://localhost:5567");
clone_updates (clone, "tcp://localhost:5568");
但这种写法还是比较啰嗦的,因为没有必要将API内部的一些设计暴露给编程人员。现在我们会使用三个套接字,而将来可能就会使用两个,或者四个。我们不可能让所有的应用程序都相应地修改吧?让我们把这些信息包装到API中:
// 指定主备服务器端点
clone_connect (clone, "tcp://localhost:5551");
clone_connect (clone, "tcp://localhost:5561");
这样一来代码就变得非常简洁,不过也会对现有代码的内部就够造成影响。我们需要从一个端点中推算出三个端点。一种方法是假设客户端和服务端使用三个连续的端点通信,并将这个规则写入协议;另一个方法是向服务器索取缺少的端点信息。我们使用第一种较为简单的方法:
1、服务器状态ROUTER在端点P;
2、服务器更新事件PUB在端点P + 1;
3、服务器更新事件SUB在端点P + 2。
clone类和第四章的flcliapi类很类似,由两部分组成:
1、一个在后台运行的异步克隆模式代理。该代理处理所有的I/O操作,实时地和服务器进行通信;
2、一个在前台应用程序中同步运行的clone类。当你创建了一个clone对象后,它会自动创建后台的clone线程;当你销毁clone对象,该后台线程也会被销毁。
前台的clone类会使用inproc管道和后台的代理进行通信。C语言中,czmq线程会自动为我们创建这个管道。这也是ZMQ多线程编程的常规方式。
如果没有ZMQ,这种异步的设计将很难处理高压工作,而ZMQ会让其变得简单。编写出来额代码会相对比较复杂。我们可以用反应堆的模式来编写,但这会进一步增加复杂度,且影响应用程序的使用。因此,我们的设计的API将更像是一个能够和服务器进行通信的键值表:
clone_t *clone_new (void);
void clone_destroy (clone_t **self_p);
void clone_connect (clone_t *self, char *address, char *service);
void clone_set (clone_t *self, char *key, char *value);
char *clone_get (clone_t *self, char *key);
下面就是克隆模式客户端模型6的代码,因为调用了API,所以非常简短:
clonecli6: Clone client, Model Six in C
//
// 克隆模式 - 客户端 - 模型6
//
// 直接编译,不建类库
#include "clone.c"
#define SUBTREE "/client/"
int main (void)
// 创建分布式哈希表
clone_t *clone = clone_new ();
// 配置
clone_subtree (clone, SUBTREE);
clone_connect (clone, "tcp://localhost", "5556");
clone_connect (clone, "tcp://localhost", "5566");
// 插入随机键值
while (!zctx_interrupted)
// 生成随机值
char key [255];
char value [10];
sprintf (key, "%s%d", SUBTREE, randof (10000));
sprintf (value, "%d", randof (1000000));
clone_set (clone, key, value, randof (30));
sleep (1);
clone_destroy (&clone);
return 0;
以下是clone类的实现:
clone: Clone class in C
/* =====================================================================
clone - client-side Clone Pattern class
---------------------------------------------------------------------
Copyright (c) 1991-2011 iMatix Corporation <www.imatix.com>
Copyright other contributors as noted in the AUTHORS file.
This file is part of the ZeroMQ Guide: http://zguide.zeromq.org
This is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or (at
your option) any later version.
This software is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public
License along with this program. If not, see
<http://www.gnu.org/licenses/>.
=====================================================================
*/
#include "clone.h"
// 请求超时时间
#define GLOBAL_TIMEOUT 4000 // msecs
// 判定服务器死亡的时间
#define SERVER_TTL 5000 // msecs
// 服务器数量
#define SERVER_MAX 2
// =====================================================================
// 同步部分,在应用程序线程中工作
// ---------------------------------------------------------------------
// 类结构
struct _clone_t
zctx_t *ctx; // 上下文
void *pipe; // 和后台代理间的通信套接字
;
// 该线程用于处理真正的clone类
static void clone_agent (void *args, zctx_t *ctx, void *pipe);
// ---------------------------------------------------------------------
// 构造函数
clone_t *
clone_new (void)
clone_t
*self;
self = (clone_t *) zmalloc (sizeof (clone_t));
self->ctx = zctx_new ();
self->pipe = zthread_fork (self->ctx, clone_agent, NULL);
return self;
// ---------------------------------------------------------------------
// 析构函数
void
clone_destroy (clone_t **self_p)
assert (self_p);
if (*self_p)
clone_t *self = *self_p;
zctx_destroy (&self->ctx);
free (self);
*self_p = NULL;
// ---------------------------------------------------------------------
// 在链接之前指定快照和更新事件的子树
// 发送给后台代理的消息内容为[SUBTREE][subtree]
void clone_subtree (clone_t *self, char *subtree)
assert (self);
zmsg_t *msg = zmsg_new ();
zmsg_addstr (msg, "SUBTREE");
zmsg_addstr (msg, subtree);
zmsg_send (&msg, self->pipe);
// ---------------------------------------------------------------------
// 连接至新的服务器端点
// 消息内容:[CONNECT][endpoint][service]
void
clone_connect (clone_t *self, char *address, char *service)
assert (self);
zmsg_t *msg = zmsg_new ();
zmsg_addstr (msg, "CONNECT");
zmsg_addstr (msg, address);
zmsg_addstr (msg, service);
zmsg_send (&msg, self->pipe);
// ---------------------------------------------------------------------
// 设置新值
// 消息内容:[SET][key][value][ttl]
void
clone_set (clone_t *self, char *key, char *value, int ttl)
char ttlstr [10];
sprintf (ttlstr, "%d", ttl);
assert (self);
zmsg_t *msg = zmsg_new ();
zmsg_addstr (msg, "SET");
zmsg_addstr (msg, key);
zmsg_addstr (msg, value);
zmsg_addstr (msg, ttlstr);
zmsg_send (&msg, self->pipe);
// ---------------------------------------------------------------------
// 取值
// 消息内容:[GET][key]
// 如果没有clone可用,会返回NULL
char *
clone_get (clone_t *self, char *key)
assert (self);
assert (key);
zmsg_t *msg = zmsg_new ();
zmsg_addstr (msg, "GET");
zmsg_addstr (msg, key);
zmsg_send (&msg, self->pipe);
zmsg_t *reply = zmsg_recv (self->pipe);
if (reply)
char *value = zmsg_popstr (reply);
zmsg_destroy (&reply);
return value;
return NULL;
// =====================================================================
// 异步部分,在后台运行
// ---------------------------------------------------------------------
// 单个服务端信息
typedef struct
char *address; // 服务端地址
int port; // 端口
void *snapshot; // 快照套接字
void *subscriber; // 接收更新事件的套接字
uint64_t expiry; // 服务器过期时间
uint requests; // 收到的快照请求数
server_t;
static server_t *
server_new (zctx_t *ctx, char *address, int port, char *subtree)
server_t *self = (server_t *) zmalloc (sizeof (server_t));
zclock_log ("I: adding server %s:%d...", address, port);
self->address = strdup (address);
self->port = port;
self->snapshot = zsocket_new (ctx, ZMQ_DEALER);
zsocket_connect (self->snapshot, "%s:%d", address, port);
self->subscriber = zsocket_new (ctx, ZMQ_SUB);
zsocket_connect (self->subscriber, "%s:%d", address, port + 1);
zsockopt_set_subscribe (self->subscriber, subtree);
return self;
static void
server_destroy (server_t **self_p)
assert (self_p);
if (*self_p)
server_t *self = *self_p;
free (self->address);
free (self);
*self_p = NULL;
// ---------------------------------------------------------------------
// 后台代理类
// 状态
#define STATE_INITIAL 0 // 连接之前
#define STATE_SYNCING 1 // 正在同步
#define STATE_ACTIVE 2 // 正在更新
typedef struct
zctx_t *ctx; // 上下文
void *pipe; // 与主线程通信的套接字
zhash_t *kvmap; // 键值表
char *subtree; // 子树
server_t *server [SERVER_MAX];
uint nbr_servers; // 范围:0 - SERVER_MAX
uint state; // 当前状态
uint cur_server; // 当前master,0/1
int64_t sequence; // 键值对编号
void *publisher; // 发布更新事件的套接字
agent_t;
static agent_t *
agent_new (zctx_t *ctx, void *pipe)
agent_t *self = (agent_t *) zmalloc (sizeof (agent_t));
self->ctx = ctx;
self->pipe = pipe;
self->kvmap = zhash_new ();
self->subtree = strdup ("");
self->state = STATE_INITIAL;
self->publisher = zsocket_new (self->ctx, ZMQ_PUB);
return self;
static void
agent_destroy (agent_t **self_p)
assert (self_p);
if (*self_p)
agent_t *self = *self_p;
int server_nbr;
for (server_nbr = 0; server_nbr < self->nbr_servers; server_nbr++)
server_destroy (&self->server [server_nbr]);
zhash_destroy (&self->kvmap);
free (self->subtree);
free (self);
*self_p = NULL;
// 若线程被中断则返回-1
static int
agent_control_message (agent_t *self)
zmsg_t *msg = zmsg_recv (self->pipe);
char *command = zmsg_popstr (msg);
if (command == NULL)
return -1;
if (streq (command, "SUBTREE"))
free (self->subtree);
self->subtree = zmsg_popstr (msg);
else
if (streq (command, "CONNECT"))
char *address = zmsg_popstr (msg);
char *service = zmsg_popstr (msg);
if (self->nbr_servers < SERVER_MAX)
self->server [self->nbr_servers++] = server_new (
self->ctx, address, atoi (service), self->subtree);
// 广播更新事件
zsocket_connect (self->publisher, "%s:%d",
address, atoi (service) + 2);
else
zclock_log ("E: too many servers (max. %d)", SERVER_MAX);
free (address);
free (service);
else
if (streq (command, "SET"))
char *key = zmsg_popstr (msg);
char *value = zmsg_popstr (msg);
char *ttl = zmsg_popstr (msg);
zhash_update (self->kvmap, key, (byte *) value);
zhash_freefn (self->kvmap, key, free);
// 向服务端发送键值对
kvmsg_t *kvmsg = kvmsg_new (0);
kvmsg_set_key (kvmsg, key);
kvmsg_set_uuid (kvmsg);
kvmsg_fmt_body (kvmsg, "%s", value);
kvmsg_set_prop (kvmsg, "ttl", ttl);
kvmsg_send (kvmsg, self->publisher);
kvmsg_destroy (&kvmsg);
puts (key);
free (ttl);
free (key); // 键值对实际由哈希表对象控制
else
if (streq (command, "GET"))
char *key = zmsg_popstr (msg);
char *value = zhash_lookup (self->kvmap, key);
if (value)
zstr_send (self->pipe, value);
else
zstr_send (self->pipe, "");
free (key);
free (value);
free (command);
zmsg_destroy (&msg);
return 0;
// ---------------------------------------------------------------------
// 异步的后台代理会维护一个服务端池,并处理来自应用程序的请求或应答。
static void
clone_agent (void *args, zctx_t *ctx, void *pipe)
agent_t *self = agent_new (ctx, pipe);
while (TRUE)
zmq_pollitem_t poll_set [] =
pipe, 0, ZMQ_POLLIN, 0 ,
0, 0, ZMQ_POLLIN, 0
;
int poll_timer = -1;
int poll_size = 2;
server_t *server = self->server [self->cur_server];
switch (self->state)
case STATE_INITIAL:
// 该状态下,如果有可用服务,会发送快照请求
if (self->nbr_servers > 0)
zclock_log ("I: 正在等待服务器 %s:%d...",
server->address, server->port);
if (server->requests < 2)
zstr_sendm (server->snapshot, "ICANHAZ?");
zstr_send (server->snapshot, self->subtree);
server->requests++;
server->expiry = zclock_time () + SERVER_TTL;
self->state = STATE_SYNCING;
poll_set [1].socket = server->snapshot;
else
poll_size = 1;
break;
case STATE_SYNCING:
// 该状态下我们从服务器端接收快照内容,若失败则尝试其他服务器
poll_set [1].socket = server->snapshot;
break;
case STATE_ACTIVE:
// 该状态下我们从服务器获取更新事件,失败则尝试其他服务器
poll_set [1].socket = server->subscriber;
break;
if (server)
poll_timer = (server->expiry - zclock_time ())
* ZMQ_POLL_MSEC;
if (poll_timer < 0)
poll_timer = 0;
// ------------------------------------------------------------
// poll循环
int rc = zmq_poll (poll_set, poll_size, poll_timer);
if (rc == -1)
break; // 上下文已被关闭
if (poll_set [0].revents & ZMQ_POLLIN)
if (agent_control_message (self))
break; // 中断
else
if (poll_set [1].revents & ZMQ_POLLIN)
kvmsg_t *kvmsg = kvmsg_recv (poll_set [1].socket);
if (!kvmsg)
break; // 中断
// 任何服务端的消息将重置它的过期时间
server->expiry = zclock_time () + SERVER_TTL;
if (self->state == STATE_SYNCING)
// 保存快照内容
server->requests = 0;
if (streq (kvmsg_key (kvmsg), "KTHXBAI"))
self->sequence = kvmsg_sequence (kvmsg);
self->state = STATE_ACTIVE;
zclock_log ("I: received from %s:%d snapshot=%d",
server->address, server->port,
(int) self->sequence);
kvmsg_destroy (&kvmsg);
else
kvmsg_store (&kvmsg, self->kvmap);
else
if (self->state == STATE_ACTIVE)
// 丢弃过期的更新事件
if (kvmsg_sequence (kvmsg) > self->sequence)
self->sequence = kvmsg_sequence (kvmsg);
kvmsg_store (&kvmsg, self->kvmap);
zclock_log ("I: received from %s:%d update=%d",
server->address, server->port,
(int) self->sequence);
else
kvmsg_destroy (&kvmsg);
else
// 服务端已死,尝试其他服务器
zclock_log ("I: 服务器 %s:%d 无响应",
server->address, server->port);
self->cur_server = (self->cur_server + 1) % self->nbr_servers;
self->state = STATE_INITIAL;
agent_destroy (&self);
最后是克隆服务器的模型6代码:
clonesrv6: Clone server, Model Six in C
//
// 克隆模式 - 服务端 - 模型6
//
// 直接编译,不建类库
#include "bstar.c"
#include "kvmsg.c"
// bstar反应堆API
static int s_snapshots (zloop_t *loop, void *socket, void *args);
static int s_collector (zloop_t *loop, void *socket, void *args);
static int s_flush_ttl (zloop_t *loop, void *socket, void *args);
static int s_send_hugz (zloop_t *loop, void *socket, void *args);
static int s_new_master (zloop_t *loop, void *unused, void *args);
static int s_new_slave (zloop_t *loop, void *unused, void *args);
static int s_subscriber (zloop_t *loop, void *socket, void *args);
// 服务端属性
typedef struct
zctx_t *ctx; // 上下文
zhash_t *kvmap; // 存放键值对
bstar_t *bstar; // bstar反应堆核心
int64_t sequence; // 更新事件编号
int port; // 主端口
int peer; // 同伴端口
void *publisher; // 发布更新事件的端口
void *collector; // 接收客户端更新事件的端口
void *subscriber; // 接受同伴更新事件的端口
zlist_t *pending; // 延迟的更新事件
Bool primary; // 是否为主机
Bool master; // 是否为master
Bool slave; // 是否为slave
clonesrv_t;
int main (int argc, char *argv [])
clonesrv_t *self = (clonesrv_t *) zmalloc (sizeof (clonesrv_t));
if (argc == 2 && streq (argv [1], "-p"))
zclock_log ("I: 作为主机master运行,正在等待备机slave连接。");
self->bstar = bstar_new (BSTAR_PRIMARY, "tcp://*:5003",
"tcp://localhost:5004");
bstar_voter (self->bstar, "tcp://*:5556", ZMQ_ROUTER,
s_snapshots, self);
self->port = 5556;
self->peer = 5566;
self->primary = TRUE;
else
if (argc == 2 && streq (argv [1], "-b"))
zclock_log ("I: 作为备机slave运行,正在等待主机master连接。");
self->bstar = bstar_new (BSTAR_BACKUP, "tcp://*:5004",
"tcp://localhost:5003");
bstar_voter (self->bstar, "tcp://*:5566", ZMQ_ROUTER,
s_snapshots, self);
self->port = 5566;
self->peer = 5556;
self->primary = FALSE;
else
printf ("Usage: clonesrv4 -p | -b \\n");
free (self);
exit (0);
// 主机将成为master
if (self->primary)
self->kvmap = zhash_new ();
self->ctx = zctx_new ();
self->pending = zlist_new ();
bstar_set_verbose (self->bstar, TRUE);
// 设置克隆服务端套接字
self->publisher = zsocket_new (self->ctx, ZMQ_PUB);
self->collector = zsocket_new (self->ctx, ZMQ_SUB);
zsocket_bind (self->publisher, "tcp://*:%d", self->port + 1);
zsocket_bind (self->collector, "tcp://*:%d", self->port + 2);
// 作为克隆客户端连接同伴
self->subscriber = zsocket_new (self->ctx, ZMQ_SUB);
zsocket_connect (self->subscriber, "tcp://localhost:%d", self->peer + 1);
// 注册状态事件处理器
bstar_new_master (self->bstar, s_new_master, self);
bstar_new_slave (self->bstar, s_new_slave, self);
// 注册bstar反应堆其他事件处理器
zloop_reader (bstar_zloop (self->bstar), self->collector, s_collector, self);
zloop_timer (bstar_zloop (self->bstar), 1000, 0, s_flush_ttl, self);
zloop_timer (bstar_zloop (self->bstar), 1000, 0, s_send_hugz, self);
// 开启bstar反应堆
bstar_start (self->bstar);
// 中断,终止。
while (zlist_size (self->pending))
kvmsg_t *kvmsg = (kvmsg_t *) zlist_pop (self->pending);
kvmsg_destroy (&kvmsg);
zlist_destroy (&self->pending);
bstar_destroy (&self->bstar);
zhash_destroy (&self->kvmap);
zctx_destroy (&self->ctx);
free (self);
return 0;
// ---------------------------------------------------------------------
// 发送快照内容
static int s_send_single (char *key, void *data, void *args);
// 请求方信息
typedef struct
void *socket; // ROUTER套接字
zframe_t *identity; // 请求放标识
char *subtree; // 子树
kvroute_t;
static int
s_snapshots (zloop_t *loop, void *snapshot, void *args)
clonesrv_t *self = (clonesrv_t *) args;
zframe_t *identity = zframe_recv (snapshot);
if (identity)
// 请求在消息的第二帧中
char *request = zstr_recv (snapshot);
char *subtree = NULL;
if (streq (request, "ICANHAZ?"))
free (request);
subtree = zstr_recv (snapshot);
else
printf ("E: 错误的请求,正在退出……\\n");
if (subtree)
// 发送状态快照
kvroute_t routing = snapshot, identity, subtree ;
zhash_foreach (self->kvmap, s_send_single, &routing);
// 发送终止消息,以及消息编号
zclock_log ("I: 正在发送快照,版本号:%d", (int) self->sequence);
zframe_send (&identity, snapshot, ZFRAME_MORE);
kvmsg_t *kvmsg = kvmsg_new (self->sequence);
kvmsg_set_key (kvmsg, "KTHXBAI");
kvmsg_set_body (kvmsg, (byte *) subtree, 0);
kvmsg_send (kvmsg, snapshot);
kvmsg_destroy (&kvmsg);
free (subtree);
return 0;
// 每次发送一个快照键值对
static int
s_send_single (char *key, void *data, void *args)
kvroute_t *kvroute = (kvroute_t *) args;
kvmsg_t *kvmsg = (kvmsg_t *) data;
if (strlen (kvroute->subtree) <= strlen (kvmsg_key (kvmsg))
&& memcmp (kvroute->subtree,
kvmsg_key (kvmsg), strlen (kvroute->subtree)) == 0)
// 先发送接收方的地址
zframe_send (&kvroute->identity,
kvroute->socket, ZFRAME_MORE + ZFRAME_REUSE);
kvmsg_send (kvmsg, kvroute->socket);
return 0;
// ---------------------------------------------------------------------
// 从客户端收集更新事件
// 如果我们是master,则将该事件写入kvmap对象;
// 如果我们是slave,则将其写入延迟队列
static int s_was_pending (clonesrv_t *self, kvmsg_t *kvmsg);
static int
s_collector (zloop_t *loop, void *collector, void *args)
clonesrv_t *self = (clonesrv_t *) args;
kvmsg_t *kvmsg = kvmsg_recv (collector);
kvmsg_dump (kvmsg);
if (kvmsg)
if (self->master)
kvmsg_set_sequence (kvmsg, ++self->sequence);
kvmsg_send (kvmsg, self->publisher);
int ttl = atoi (kvmsg_get_prop (kvmsg, "ttl"));
if (ttl)
kvmsg_set_prop (kvmsg, "ttl",
"%" PRId64, zclock_time () + ttl * 1000);
kvmsg_store (&kvmsg, self->kvmap);
zclock_log ("I: 正在发布更新事件:%d", (int) self->sequence);
else
// 如果我们已经从master中获得了该事件,则丢弃该消息
if (s_was_pending (self, kvmsg))
kvmsg_destroy (&kvmsg);
else
zlist_append (self->pending, kvmsg);
return 0;
// 如果消息已在延迟队列中,则删除它并返回TRUE
static int
s_was_pending (clonesrv_t *self, kvmsg_t *kvmsg)
kvmsg_t *held = (kvmsg_t *) zlist_first (self->pending);
while (held)
if (memcmp (kvmsg_uuid (kvmsg),
kvmsg_uuid (held), sizeof (uuid_t)) == 0)
zlist_remove (self->pending, held);
return TRUE;
held = (kvmsg_t *) zlist_next (self->pending);
return FALSE;
// ---------------------------------------------------------------------
// 删除带有过期时间的瞬间值
static int s_flush_single (char *key, void *data, void *args);
static int
s_flush_ttl (zloop_t *loop, void *unused, void *args)
clonesrv_t *self = (clonesrv_t *) args;
zhash_foreach (self->kvmap, s_flush_single, args);
return 0;
// 如果键值对过期,则进行删除操作,并广播该事件
static int
s_flush_single (char *key, void *data, void *args)
clonesrv_t *self = (clonesrv_t *) args;
kvmsg_t *kvmsg = (kvmsg_t *) data;
int64_t ttl;
sscanf (kvmsg_get_prop (kvmsg, "ttl"), "%" PRId64, &ttl);
if (ttl && zclock_time () >= ttl)
kvmsg_set_sequence (kvmsg, ++self->sequence);
kvmsg_set_body (kvmsg, (byte *) "", 0);
kvmsg_send (kvmsg, self->publisher);
kvmsg_store (&kvmsg, self->kvmap);
zclock_log ("I: 正在发布删除事件:%d", (int) self->sequence);
return 0;
// ---------------------------------------------------------------------
// 发送心跳
static int
s_send_hugz (zloop_t *loop, void *unused, void *args)
clonesrv_t *self = (clonesrv_t *) args;
kvmsg_t *kvmsg = kvmsg_new (self->sequence);
kvmsg_set_key (kvmsg, "HUGZ");
kvmsg_set_body (kvmsg, (byte *) "", 0);
kvmsg_send (kvmsg, self->publisher);
kvmsg_destroy (&kvmsg);
return 0;
// ---------------------------------------------------------------------
// 状态改变事件处理函数
// 我们将转变为master
//
// 备机先将延迟列表中的事件更新到自己的快照中,
// 并开始接收客户端发来的快照请求。
static int
s_new_master (zloop_t *loop, void *unused, void *args)
clonesrv_t *self = (clonesrv_t *) args;
self->master = TRUE;
self->slave = FALSE;
zloop_cancel (bstar_zloop (self->bstar), self->subscriber);
// 应用延迟列表中的事件
while (zlist_size (self->pending))
kvmsg_t *kvmsg = (kvmsg_t *) zlist_pop (self->pending);
kvmsg_set_sequence (kvmsg, ++self->sequence);
kvmsg_send (kvmsg, self->publisher);
kvmsg_store (&kvmsg, self->kvmap);
zclock_log ("I: 正在发布延迟列表中的更新事件:%d", (int) self->sequence);
return 0;
// ---------------------------------------------------------------------
// 正在切换为slave
static int
s_new_slave (zloop_t *loop, void *unused, void *args)
clonesrv_t *self = (clonesrv_t *) args;
zhash_destroy (&self->kvmap);
self->master = FALSE;
self->slave = TRUE;
zloop_reader (bstar_zloop (self->bstar), self->subscriber,
s_subscriber, self);
return 0;
// ---------------------------------------------------------------------
// 从同伴主机(master)接收更新事件;
// 接收该类更新事件时,我们一定是slave。
static int
s_subscriber (zloop_t *loop, void *subscriber, void *args)
clonesrv_t *self = (clonesrv_t *) args;
// 获取快照,如果需要的话。
if (self->kvmap == NULL)
self->kvmap = zhash_new ();
void *snapshot = zsocket_new (self->ctx, ZMQ_DEALER);
zsocket_connect (snapshot, "tcp://localhost:%d", self->peer);
zclock_log ("I: 正在请求快照:tcp://localhost:%d",
self->peer);
zstr_send (snapshot, "ICANHAZ?");
while (TRUE)
kvmsg_t *kvmsg = kvmsg_recv (snapshot);
if (!kvmsg)
break; // 中断
if (streq (kvmsg_key (kvmsg), "KTHXBAI"))
self->sequence = kvmsg_sequence (kvmsg);
kvmsg_destroy (&kvmsg);
break; // 完成
kvmsg_store (&kvmsg, self->kvmap);
zclock_log ("I: 收到快照,版本号:%d", (int) self->sequence);
zsocket_destroy (self->ctx, snapshot);
// 查找并删除
kvmsg_t *kvmsg = kvmsg_recv (subscriber);
if (!kvmsg)
return 0;
if (strneq (kvmsg_key (kvmsg), "HUGZ"))
if (!s_was_pending (self, kvmsg))
// 如果master的更新事件比客户端的事件早到,则将master的事件存入延迟列表,
// 当收到客户端更新事件时会将其从列表中清除。
zlist_append (self->pending, kvmsg_dup (kvmsg));
// 如果更新事件比kvmap版本高,则应用它
if (kvmsg_sequence (kvmsg) > self->sequence)
self->sequence = kvmsg_sequence (kvmsg);
kvmsg_store (&kvmsg, self->kvmap);
zclock_log ("I: 收到更新事件:%d", (int) self->sequence);
else
kvmsg_destroy (&kvmsg);
else
kvmsg_destroy (&kvmsg);
return 0;
这段程序只有几百行,但还是花了一些时间来进行调通的。这个模型中包含了故障恢复,瞬间值,子树等等。虽然我们前期设计得很完备,但要在多个套接字之间进行调试还是很困难的。以下是我的工作方式:
1、由于使用了反应堆(bstar,建立在zloop之上),我们节省了大量的代码,让程序变得简洁明了。整个服务以一个线程运行,因此不会出现跨线程的问题。只需将结构指针(self)传递给所有的处理器即可。此外,使用发应堆后可以让代码更为模块化,易于重用。
2、我们逐个模块进行调试,只有某个模块能够正常运行时才会进入下一步。由于使用了四五个套接字,因此调试的工作量是很大的。我直接将调试信息输出到了屏幕上,因为实在没有必要专门开一个调试器来工作。
3、因为一直在使用valgrind工具进行测试,因此我能确定程序没有内存泄漏的问题。在C语言中,内存泄漏是我们非常关心的问题,因为没有什么垃圾回收机制可以帮你完成。正确地使用像kvmsg、czmq之类的抽象层可以很好地避免内存泄漏。
这段程序肯定还会存在一些BUG,部分读者可能会帮助我调试和修复,我在此表示感谢。
测试模型6时,先开启主机和备机,再打开一组客户端,顺序随意。随机地中止某个服务进程,如果程序设计得是正确的,那客户端获得的数据应该都是一致的。
以上是关于ZMQ之克隆模式的可靠性的主要内容,如果未能解决你的问题,请参考以下文章