zookeeper c api主备切换例子

Posted fulianzhou

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了zookeeper c api主备切换例子相关的知识,希望对你有一定的参考价值。

步骤:
1.准备3台主机并同步好时间。
2.在主机1启动zookeeper服务,创建节点/services
3.在主机2运行应用程序,连接主机1的zookeeper
4.主机2往zookeeper的/services节点注册临时节点,节点名=IP:端口 节点数据=注册时间us
5.主机2监听/services节点
6.分别在主机1,主机3启动运行应用程序;分别向zookeeper注册临时节点。并监听/services节点。
7.以先启动的应用程序为主机,及节点数据小的为主机。
8.三台主机运行起来后,只有一台为主机,其余为备机。

zkCli.h

#ifndef _ZKCLIENT_H_
#define _ZKCLIENT_H_

#ifdef _WIN32
    #include "zk\\include\\zookeeper.h"
    #include "zk\\include\\zookeeper.jute.h"
    #include "zk\\include\\zookeeper_log.h"
    #include "zk\\include\\zookeeper_version.h"
#else
    #include <zookeeper/zookeeper.h>
    #include <sys/types.h>
    #include <ifaddrs.h>
    #include <arpa/inet.h>
    #include <sys/socket.h>
    #include <netdb.h>
    #include <stdlib.h>
    #include <unistd.h>
    #include <net/if.h>
    #include <errno.h>
    #include <string.h>
    #include <pthread.h>
#endif

#define INFO(fmt, args...) printf("[%s:%d %s][INFO]");printf(fmt, ##args);

typedef void(*FUN)(const char*, const char*);

class ZkCli
{
private:
    zhandle_t *m_ZkHandle;
    pthread_mutex_t m_mutex;
    pthread_cond_t m_cond;
    bool m_Connected;
    char m_servAddr[64];
    char m_zkAddr[64];
    char m_path[64];
    FUN m_func;
public:
    ZkCli();
    ~ZkCli();

    bool zkInit(const char* zkaddr, const char* path);
    int Create(const char* path);
    int Set(const char* path, const char* data, int size);
    void Register(const char* listenIp, const unsigned short listenPort, FUN func);
    void getLocalIp(char *localIp, int len);

private:
    int64_t getTimeStampUs();
    void setConnected(bool stat);
    void ServiceWatch(const char* path);

private:
    /*typedef void (*watcher_fn)(zhandle_t *zh, int type, int state, const char *path, void *watcherCtx);*/
    static void init_watcher(zhandle_t *zh, int type, int stat, const char* path, void* watcherCtx);
    static void require_wachter(zhandle_t *zh, int type, int stat, const char* path, void* watcherCtx);
};

#endif

zkCli.cpp

#include "zkCli.h"
#include <sys/time.h>
#include <unordered_map>

ZkCli::ZkCli()
: m_ZkHandle(NULL),
m_Connected(false)
{
    pthread_mutex_init(&m_mutex, NULL);
    pthread_cond_init(&m_cond, NULL);
}

ZkCli::~ZkCli()
{
    pthread_mutex_destroy(&m_mutex);
    pthread_cond_destroy(&m_cond);

    zookeeper_close(m_ZkHandle);
}

int64_t ZkCli::getTimeStampUs()
{
    struct timeval stamp;

    gettimeofday(&stamp, NULL);

    return (int64_t)(stamp.tv_sec * 1000000 + stamp.tv_usec);
}

void ZkCli::setConnected(bool stat)
{
    if (stat)
    {
        pthread_cond_signal(&m_cond);
    }
    m_Connected = stat;
}

void ZkCli::init_watcher(zhandle_t *zh, int type, int stat, const char* path, void* watcherCtx)
{
    ZkCli* pCli = (ZkCli*)watcherCtx;

    if (type == ZOO_SESSION_EVENT)
    {
        if (stat == ZOO_CONNECTING_STATE)
        {
            pCli->setConnected(false);
        }
        else if (stat == ZOO_CONNECTED_STATE)
        {
            pCli->setConnected(true);
        }
        else if (stat == ZOO_NOTCONNECTED_STATE)
        {
            pCli->setConnected(false);
        }
    }
}

void ZkCli::require_wachter(zhandle_t* zh, int type, int stat, const char* path, void* watherCtx)
{
    printf("require_wachter() : type=%d, stat=%d, path=%s\\n", type, stat, path);

    ZkCli* pZkCli = (ZkCli*)watherCtx;
    pZkCli->ServiceWatch(path);
}

void ZkCli::getLocalIp(char* localIp, int len)
{
    struct ifaddrs *ifaddr;

    if (getifaddrs(&ifaddr) == -1)
    {
        printf("getifaddr failed, errno=%d, errmsg=%s\\n", errno, strerror(errno));
        exit(EXIT_FAILURE);
    }

    char host[NI_MAXHOST];
    for (struct ifaddrs *ifa = ifaddr; ifa != NULL; ifa = ifa->ifa_next) {
        if (ifa->ifa_addr == NULL)
            continue;
        int family = ifa->ifa_addr->sa_family;

        if (!ifa->ifa_flags & IFF_BROADCAST)
        {
            continue;
        }

        if (family == AF_INET || family == AF_INET6)
        {
            int s = getnameinfo(ifa->ifa_addr, (family == AF_INET) ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6), host, NI_MAXHOST, NULL, 0, NI_NUMERICHOST);
            if (s != 0)
            {
                printf("getnameinfo() failed: %s\\n", gai_strerror(s));
                break;
            }

            if (strcmp(host, "127.0.0.1"))
            {
                strncpy(localIp, host, len);
                break;
            }
        }
    }

    freeifaddrs(ifaddr);
}

bool ZkCli::zkInit(const char* zkAddr, const char* path)
{
    zoo_set_debug_level(ZOO_LOG_LEVEL_WARN);
    m_ZkHandle = zookeeper_init(zkAddr, init_watcher, 3000, NULL, this, 0);
    if (!m_ZkHandle)
    {
        INFO("zookeeper_init failed, zkAddr:%s, errno=%d, errmsg=%s\\n", zkAddr, errno, strerror(errno));
        return false;
    }
    strncpy(m_zkAddr, zkAddr, sizeof(m_zkAddr));

    struct timeval now;
    gettimeofday(&now, NULL);

    pthread_mutex_lock(&m_mutex);

    struct timespec waitTime;
    waitTime.tv_sec = now.tv_sec + 5;
    waitTime.tv_nsec = now.tv_usec * 1000;

    pthread_cond_timedwait(&m_cond, &m_mutex, &waitTime);

    pthread_mutex_unlock(&m_mutex);

    strncpy(m_path, path, sizeof(m_path));

    int ret = Create(path);
    if (ret != ZOK && ret != ZNODEEXISTS)
    {
        printf("create root path failed, path=%s\\n", path);
        return false;
    }

    ServiceWatch(path);

    return true;
}

int ZkCli::Create(const char* path)
{
    char path_buffer[256] = { 0 };
    int ret = zoo_create(m_ZkHandle, path, "", -1, &ZOO_OPEN_ACL_UNSAFE, 0, path_buffer, sizeof(path_buffer)-1);
    if (ret == ZOK)
    {
        printf("create node %s succeed!\\n", path);
    }
    else
    {
        printf("create node %s, ret=%d\\n", path, ret);
    }

    return ret;
}

int ZkCli::Set(const char* path, const char* data, int size)
{
    char path_buffer[256] = { 0 };
    int ret = zoo_create(m_ZkHandle, path, data, size, &ZOO_OPEN_ACL_UNSAFE, ZOO_EPHEMERAL, path_buffer, sizeof(path_buffer)-1);
    if (ret == ZOK)
    {
        printf("set [%s]=[%s] succeed!\\n", path_buffer, data);
    }
    else
    {
        printf("set [%s]=[%s] failed, ret=%d\\n", path, data, ret);
    }
    return ret;
}

void ZkCli::ServiceWatch(const char* path)
{
/*ZOOAPI int zoo_wget_children(zhandle_t *zh, const char *path,watcher_fn watcher,void* watcherCtx,struct String_vector *strings);*/
    String_vector strvect;
    int rc = zoo_wget_children(m_ZkHandle, path, require_wachter, this, &strvect);
    if (rc != ZOK)
    {
        printf("zoo_wget_children faield, path:%s, rc=%d\\n", path, rc);
        return;
    }
    
    std::string master_server;
    uint64_t min_starttimeus = 0;
    for (int i = 0; i < strvect.count; i++)
    {
        printf("str[%d]:%s\\n", i, strvect.data[i]);

        char nodepath[64] = { 0 };
        snprintf(nodepath, sizeof(nodepath)-1, "%s/%s", path, strvect.data[i]);

        char buffer[256] = { 0 };
        int buffer_len = sizeof(buffer);
        
        if (ZOK == zoo_get(m_ZkHandle, nodepath, 0, buffer, &buffer_len, NULL))
        {
            uint64_t startTimeus = atoll(buffer);
            if (min_starttimeus == 0)
            {
                min_starttimeus = startTimeus;
                master_server = strvect.data[i];
            }
            else if (min_starttimeus > startTimeus)
            {
                min_starttimeus = startTimeus;
                master_server = strvect.data[i];
            }

            printf("%s=%ld\\n", nodepath, startTimeus);
        }
    }

    printf("master server=%s, timeus=%ld\\n", master_server.c_str(), min_starttimeus);
    if (strcmp(m_servAddr, master_server.c_str()) == 0)
    {
        printf("should switch to master!\\n");
        if (m_func)
        {
            m_func("","");
        }
    }
    else
    {
        printf("should switch to slave!\\n");
    }
}

void ZkCli::Register(const char* listenIp, const unsigned short listenPort, FUN func)
{
    snprintf(m_servAddr, sizeof(m_servAddr)-1, "%s:%d", listenIp, listenPort);
    m_func = func;

    int64_t TimeStampUs = getTimeStampUs();
    char TimeStampStr[32] = { 0 };
    snprintf(TimeStampStr, sizeof(TimeStampStr)-1, "%ld", TimeStampUs);

    char ServicePath[128] = { 0 };
    snprintf(ServicePath, sizeof(ServicePath)-1, "%s/%s", m_path, m_servAddr);

    Set(ServicePath, TimeStampStr, strlen(TimeStampStr));
}

Makefile

CC=g++ -g
SRC=$(wildcard *.cpp)
OBJ=${SRC:%.cpp=%.o}

FLAG=-DTHREADED
LIB=-lpthread -lzookeeper_mt

TARGET=zk_client

all:${TARGET}

${TARGET}:${OBJ}
	${CC} -o ${TARGET} ${OBJ} ${LIB}

%.o:%.cpp
	${CC} ${FLAG} -c $? -o $@

运行结果:
62:

[root@localhost zk_client]# ./zk_client
create node /services, ret=-110
master server=, timeus=0
should switch to slave!
zookeeper connected!
require_wachter() : type=4, stat=3, path=/services
set [/services/192.168.1.62:9044]=[1632473624547276] succeed!
str[0]:192.168.1.62:9044
/services/192.168.1.62:9044=1632473624547276
master server=192.168.1.62:9044, timeus=1632473624547276
should switch to master!
enter switch func
require_wachter() : type=4, stat=3, path=/services
str[0]:192.168.1.86:9044
/services/192.168.1.86:9044=1632473592590648
str[1]:192.168.1.62:9044
/services/192.168.1.62:9044=1632473624547276
master server=192.168以上是关于zookeeper c api主备切换例子的主要内容,如果未能解决你的问题,请参考以下文章

zookeeper c api主备切换例子

zookeeper主备切换学习

Spark系列Master主备切换机制

spark的主备切换极致原理剖析

2--Master主备切换机制原理剖析与源码分析

在片段之间切换时如何处理相机?