zookeeper c api主备切换例子
Posted fulianzhou
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了zookeeper c api主备切换例子相关的知识,希望对你有一定的参考价值。
步骤:
1.准备3台主机并同步好时间。
2.在主机1启动zookeeper服务,创建节点/services
3.在主机2运行应用程序,连接主机1的zookeeper
4.主机2往zookeeper的/services节点注册临时节点,节点名=IP:端口 节点数据=注册时间us
5.主机2监听/services节点
6.分别在主机1,主机3启动运行应用程序;分别向zookeeper注册临时节点。并监听/services节点。
7.以先启动的应用程序为主机,及节点数据小的为主机。
8.三台主机运行起来后,只有一台为主机,其余为备机。
zkCli.h
#ifndef _ZKCLIENT_H_
#define _ZKCLIENT_H_
#ifdef _WIN32
#include "zk\\include\\zookeeper.h"
#include "zk\\include\\zookeeper.jute.h"
#include "zk\\include\\zookeeper_log.h"
#include "zk\\include\\zookeeper_version.h"
#else
#include <zookeeper/zookeeper.h>
#include <sys/types.h>
#include <ifaddrs.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <netdb.h>
#include <stdlib.h>
#include <unistd.h>
#include <net/if.h>
#include <errno.h>
#include <string.h>
#include <pthread.h>
#endif
#define INFO(fmt, args...) printf("[%s:%d %s][INFO]");printf(fmt, ##args);
typedef void(*FUN)(const char*, const char*);
class ZkCli
{
private:
zhandle_t *m_ZkHandle;
pthread_mutex_t m_mutex;
pthread_cond_t m_cond;
bool m_Connected;
char m_servAddr[64];
char m_zkAddr[64];
char m_path[64];
FUN m_func;
public:
ZkCli();
~ZkCli();
bool zkInit(const char* zkaddr, const char* path);
int Create(const char* path);
int Set(const char* path, const char* data, int size);
void Register(const char* listenIp, const unsigned short listenPort, FUN func);
void getLocalIp(char *localIp, int len);
private:
int64_t getTimeStampUs();
void setConnected(bool stat);
void ServiceWatch(const char* path);
private:
/*typedef void (*watcher_fn)(zhandle_t *zh, int type, int state, const char *path, void *watcherCtx);*/
static void init_watcher(zhandle_t *zh, int type, int stat, const char* path, void* watcherCtx);
static void require_wachter(zhandle_t *zh, int type, int stat, const char* path, void* watcherCtx);
};
#endif
zkCli.cpp
#include "zkCli.h"
#include <sys/time.h>
#include <unordered_map>
ZkCli::ZkCli()
: m_ZkHandle(NULL),
m_Connected(false)
{
pthread_mutex_init(&m_mutex, NULL);
pthread_cond_init(&m_cond, NULL);
}
ZkCli::~ZkCli()
{
pthread_mutex_destroy(&m_mutex);
pthread_cond_destroy(&m_cond);
zookeeper_close(m_ZkHandle);
}
int64_t ZkCli::getTimeStampUs()
{
struct timeval stamp;
gettimeofday(&stamp, NULL);
return (int64_t)(stamp.tv_sec * 1000000 + stamp.tv_usec);
}
void ZkCli::setConnected(bool stat)
{
if (stat)
{
pthread_cond_signal(&m_cond);
}
m_Connected = stat;
}
void ZkCli::init_watcher(zhandle_t *zh, int type, int stat, const char* path, void* watcherCtx)
{
ZkCli* pCli = (ZkCli*)watcherCtx;
if (type == ZOO_SESSION_EVENT)
{
if (stat == ZOO_CONNECTING_STATE)
{
pCli->setConnected(false);
}
else if (stat == ZOO_CONNECTED_STATE)
{
pCli->setConnected(true);
}
else if (stat == ZOO_NOTCONNECTED_STATE)
{
pCli->setConnected(false);
}
}
}
void ZkCli::require_wachter(zhandle_t* zh, int type, int stat, const char* path, void* watherCtx)
{
printf("require_wachter() : type=%d, stat=%d, path=%s\\n", type, stat, path);
ZkCli* pZkCli = (ZkCli*)watherCtx;
pZkCli->ServiceWatch(path);
}
void ZkCli::getLocalIp(char* localIp, int len)
{
struct ifaddrs *ifaddr;
if (getifaddrs(&ifaddr) == -1)
{
printf("getifaddr failed, errno=%d, errmsg=%s\\n", errno, strerror(errno));
exit(EXIT_FAILURE);
}
char host[NI_MAXHOST];
for (struct ifaddrs *ifa = ifaddr; ifa != NULL; ifa = ifa->ifa_next) {
if (ifa->ifa_addr == NULL)
continue;
int family = ifa->ifa_addr->sa_family;
if (!ifa->ifa_flags & IFF_BROADCAST)
{
continue;
}
if (family == AF_INET || family == AF_INET6)
{
int s = getnameinfo(ifa->ifa_addr, (family == AF_INET) ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6), host, NI_MAXHOST, NULL, 0, NI_NUMERICHOST);
if (s != 0)
{
printf("getnameinfo() failed: %s\\n", gai_strerror(s));
break;
}
if (strcmp(host, "127.0.0.1"))
{
strncpy(localIp, host, len);
break;
}
}
}
freeifaddrs(ifaddr);
}
bool ZkCli::zkInit(const char* zkAddr, const char* path)
{
zoo_set_debug_level(ZOO_LOG_LEVEL_WARN);
m_ZkHandle = zookeeper_init(zkAddr, init_watcher, 3000, NULL, this, 0);
if (!m_ZkHandle)
{
INFO("zookeeper_init failed, zkAddr:%s, errno=%d, errmsg=%s\\n", zkAddr, errno, strerror(errno));
return false;
}
strncpy(m_zkAddr, zkAddr, sizeof(m_zkAddr));
struct timeval now;
gettimeofday(&now, NULL);
pthread_mutex_lock(&m_mutex);
struct timespec waitTime;
waitTime.tv_sec = now.tv_sec + 5;
waitTime.tv_nsec = now.tv_usec * 1000;
pthread_cond_timedwait(&m_cond, &m_mutex, &waitTime);
pthread_mutex_unlock(&m_mutex);
strncpy(m_path, path, sizeof(m_path));
int ret = Create(path);
if (ret != ZOK && ret != ZNODEEXISTS)
{
printf("create root path failed, path=%s\\n", path);
return false;
}
ServiceWatch(path);
return true;
}
int ZkCli::Create(const char* path)
{
char path_buffer[256] = { 0 };
int ret = zoo_create(m_ZkHandle, path, "", -1, &ZOO_OPEN_ACL_UNSAFE, 0, path_buffer, sizeof(path_buffer)-1);
if (ret == ZOK)
{
printf("create node %s succeed!\\n", path);
}
else
{
printf("create node %s, ret=%d\\n", path, ret);
}
return ret;
}
int ZkCli::Set(const char* path, const char* data, int size)
{
char path_buffer[256] = { 0 };
int ret = zoo_create(m_ZkHandle, path, data, size, &ZOO_OPEN_ACL_UNSAFE, ZOO_EPHEMERAL, path_buffer, sizeof(path_buffer)-1);
if (ret == ZOK)
{
printf("set [%s]=[%s] succeed!\\n", path_buffer, data);
}
else
{
printf("set [%s]=[%s] failed, ret=%d\\n", path, data, ret);
}
return ret;
}
void ZkCli::ServiceWatch(const char* path)
{
/*ZOOAPI int zoo_wget_children(zhandle_t *zh, const char *path,watcher_fn watcher,void* watcherCtx,struct String_vector *strings);*/
String_vector strvect;
int rc = zoo_wget_children(m_ZkHandle, path, require_wachter, this, &strvect);
if (rc != ZOK)
{
printf("zoo_wget_children faield, path:%s, rc=%d\\n", path, rc);
return;
}
std::string master_server;
uint64_t min_starttimeus = 0;
for (int i = 0; i < strvect.count; i++)
{
printf("str[%d]:%s\\n", i, strvect.data[i]);
char nodepath[64] = { 0 };
snprintf(nodepath, sizeof(nodepath)-1, "%s/%s", path, strvect.data[i]);
char buffer[256] = { 0 };
int buffer_len = sizeof(buffer);
if (ZOK == zoo_get(m_ZkHandle, nodepath, 0, buffer, &buffer_len, NULL))
{
uint64_t startTimeus = atoll(buffer);
if (min_starttimeus == 0)
{
min_starttimeus = startTimeus;
master_server = strvect.data[i];
}
else if (min_starttimeus > startTimeus)
{
min_starttimeus = startTimeus;
master_server = strvect.data[i];
}
printf("%s=%ld\\n", nodepath, startTimeus);
}
}
printf("master server=%s, timeus=%ld\\n", master_server.c_str(), min_starttimeus);
if (strcmp(m_servAddr, master_server.c_str()) == 0)
{
printf("should switch to master!\\n");
if (m_func)
{
m_func("","");
}
}
else
{
printf("should switch to slave!\\n");
}
}
void ZkCli::Register(const char* listenIp, const unsigned short listenPort, FUN func)
{
snprintf(m_servAddr, sizeof(m_servAddr)-1, "%s:%d", listenIp, listenPort);
m_func = func;
int64_t TimeStampUs = getTimeStampUs();
char TimeStampStr[32] = { 0 };
snprintf(TimeStampStr, sizeof(TimeStampStr)-1, "%ld", TimeStampUs);
char ServicePath[128] = { 0 };
snprintf(ServicePath, sizeof(ServicePath)-1, "%s/%s", m_path, m_servAddr);
Set(ServicePath, TimeStampStr, strlen(TimeStampStr));
}
Makefile
CC=g++ -g
SRC=$(wildcard *.cpp)
OBJ=${SRC:%.cpp=%.o}
FLAG=-DTHREADED
LIB=-lpthread -lzookeeper_mt
TARGET=zk_client
all:${TARGET}
${TARGET}:${OBJ}
${CC} -o ${TARGET} ${OBJ} ${LIB}
%.o:%.cpp
${CC} ${FLAG} -c $? -o $@
运行结果:
62:
[root@localhost zk_client]# ./zk_client
create node /services, ret=-110
master server=, timeus=0
should switch to slave!
zookeeper connected!
require_wachter() : type=4, stat=3, path=/services
set [/services/192.168.1.62:9044]=[1632473624547276] succeed!
str[0]:192.168.1.62:9044
/services/192.168.1.62:9044=1632473624547276
master server=192.168.1.62:9044, timeus=1632473624547276
should switch to master!
enter switch func
require_wachter() : type=4, stat=3, path=/services
str[0]:192.168.1.86:9044
/services/192.168.1.86:9044=1632473592590648
str[1]:192.168.1.62:9044
/services/192.168.1.62:9044=1632473624547276
master server=192.168以上是关于zookeeper c api主备切换例子的主要内容,如果未能解决你的问题,请参考以下文章