基于ZooKeeper实现服务注册与发现

Posted .番茄炒蛋

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了基于ZooKeeper实现服务注册与发现相关的知识,希望对你有一定的参考价值。

简介

ZooKeeper官网

在分布式系统中,服务注册与发现是一项重要的技术,本文提供Java代码基于ZooKeeper来实现的服务注册与发现的功能.

服务注册

package com.fanqiechaodan.service;

import org.apache.zookeeper.*;

import java.io.IOException;

/**
 * @author fanqiechaodan
 * @Classname ServiceRegistry
 * @Description 服务注册
 */
public class ServiceRegistry 

    private static final String SERVICE_REGISTRY_ROOT = "/services";

    private static final int SESSION_TIMEOUT = 5000;

    private ZooKeeper zooKeeper;

    public ServiceRegistry(String zooKeeperAddress) throws IOException 
        this.zooKeeper = new ZooKeeper(zooKeeperAddress, SESSION_TIMEOUT, null);
    

    /**
     * 注册服务
     *
     * @param serviceName
     * @param serviceAddress
     * @throws KeeperException
     * @throws InterruptedException
     */
    public void registryService(String serviceName, String serviceAddress) throws KeeperException, InterruptedException 
        String servicePath = SERVICE_REGISTRY_ROOT + "/" + serviceName;

        // 如果根节点不存在就创建
        if (zooKeeper.exists(SERVICE_REGISTRY_ROOT, false) == null) 
            zooKeeper.create(SERVICE_REGISTRY_ROOT, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        

        String serviceNodePath = zooKeeper.create(servicePath, serviceAddress.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        System.out.println("注册成功;serviceNodePath:" + serviceNodePath);
    

    public void close() throws InterruptedException 
        zooKeeper.close();
    

服务消费

package com.fanqiechaodan.service;

import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;

import java.io.IOException;
import java.util.List;

/**
 * @author fanqiechaodan
 * @Classname ServiceConsumer
 * @Description
 */
public class ServiceConsumer 

    private static final String SERVICE_REGISTRY_ROOT = "/services";

    private static final int SESSION_TIMEOUT = 5000;

    private ZooKeeper zooKeeper;

    public ServiceConsumer(String zooKeeperAddress) throws IOException 
        this.zooKeeper = new ZooKeeper(zooKeeperAddress, SESSION_TIMEOUT, null);
    

    /**
     * 获取服务地址
     *
     * @param serviceName
     * @return
     * @throws KeeperException
     * @throws InterruptedException
     */
    public String consumeService(String serviceName) throws KeeperException, InterruptedException 
        String servicePath = SERVICE_REGISTRY_ROOT;
        List<String> childrenList = zooKeeper.getChildren(servicePath, watchedEvent -> 
            if (watchedEvent.getType() == Watcher.Event.EventType.NodeChildrenChanged) 
                System.out.println("serviceNodePath:" + watchedEvent.getPath());
                // synchronized为了保证线程安全,确保notifyAll()调用之前不会有人修改ServiceConsumer
                // 使用notifyAll()唤醒左右等待中的线程,让他们同时重新获取服务列表,避免单个线程多次获取同一服务节点的情况
                synchronized (this) 
                    notifyAll();
                
            
        );
        String serviceNode = childrenList.get((int) (Math.random() * childrenList.size()));
        String res = new String(zooKeeper.getData(servicePath + "/" + serviceNode, false, null));
        System.out.println("serviceNode:" + serviceNode);
        System.out.println("serviceAddress:" + res);
        return res;
    

    public void close() throws InterruptedException 
        zooKeeper.close();
    

测试

package com.fanqiechaodan;

import com.fanqiechaodan.service.ServiceConsumer;
import com.fanqiechaodan.service.ServiceRegistry;

/**
 * @author fanqiechaodan
 * @Classname Demo
 * @Description
 */
public class Demo 

    private static final String ZOOKEEPER_ADDRESS = "192.168.56.129:2181";
    private static final String SERVICE_NAME = "fanqiechaodan";

    public static void main(String[] args) throws Exception 
        // 创建服务注册对象
        ServiceRegistry serviceRegistry = new ServiceRegistry(ZOOKEEPER_ADDRESS);

        // 启动线程1,进行服务的注册
        Thread thread1 = new Thread(() -> 
            try 
                serviceRegistry.registryService(SERVICE_NAME, "127.0.0.1:8080");
             catch (Exception e) 
                e.printStackTrace();
            
        );
        thread1.start();

        // 让线程1先执行一会儿,以确保服务已经被注册
        Thread.sleep(3000);

        // 创建服务消费对象
        ServiceConsumer serviceConsumer = new ServiceConsumer(ZOOKEEPER_ADDRESS);

        // 启动线程2,进行服务的消费
        Thread thread2 = new Thread(() -> 
            try 
                String serviceAddress = serviceConsumer.consumeService(SERVICE_NAME);
                System.out.println("Service consumed: " + serviceAddress);
             catch (Exception e) 
                e.printStackTrace();
            
        );
        thread2.start();

        // 等待线程2执行完成
        thread2.join();
        
        // 睡眠30秒查看ZooKeeper节点是否存在
        Thread.sleep(30000);
        // 关闭服务注册和服务消费对象
        serviceRegistry.close();
        serviceConsumer.close();
    



总结

上面Java代码实现了服务注册和消费的基本流程,首先服务提供方通过ZooKeeper将服务节点注册到服务注册中心,然后再服务消费方通过ZooKeeper获取服务节点列表,并从中随机的选择一个服务节点进行调用,再实际应用中,还需要对服务节点进行心跳检测,负载均衡等处理,以保证服务的高可用性和稳定性.

zookeeper 实现一个简单的服务注册与发现(C++) 三:服务发现

git:git@github.com:ccx19930930/services_register_and_discovery.git

参考链接:https://www.cnblogs.com/haippy/archive/2013/02/21/2920280.html

 

down_service_mgr.h

 1 #ifndef _DOWN_SERVICE_MGR_H_
 2 #define _DOWN_SERVICE_MGR_H_
 3 
 4 #include "base_class.h"
 5 
 6 class CDownServiceMgr : CUnCopyable
 7 {
 8 public:
 9     CDownServiceMgr(int module_id) : m_module_id(module_id) {}
10     ~CDownServiceMgr() {}
11 private:
12     CDownServiceMgr() {}
13 
14 public:
15     int Register(const string& zk_path, CNodeInfo* node_info);
16     int UnRegister(const string& zk_path);
17 
18 private:
19     int m_module_id;
20     map<string, CNodeInfo *> m_node_list;
21      pthread_mutex_t m_mutex;
22 };
23 
24 #endif

 

down_service_mgr.cpp

 1 #include "down_service_mgr.h"
 2 #include "auto_lock.h"
 3 
 4 int CDownServiceMgr::Register(const string& zk_path, CNodeInfo* node_info)
 5 {
 6     CAutoMutexLock auto_lock(m_mutex);
 7     if (m_node_list.count(zk_path))
 8     {
 9         return -1;
10     }
11     m_node_list[zk_path] = node_info;
12     //TODO 长连接等
13 
14     return 0;
15 }
16 
17 int CDownServiceMgr::UnRegister(const string& zk_path)
18 {
19     CAutoMutexLock auto_lock(m_mutex);
20     if (m_node_list.count(zk_path) == 0)
21     {
22         return -1;
23     }
24     //TODO 长连接等
25 
26     m_node_list.erase(zk_path);
27     return 0;
28 }

 

discovery.h

 1 #ifndef _DISCOVERY_H_
 2 #define _DISCOVERY_H_
 3 
 4 #include "base_class.h"
 5 #include "zk_handle.h"
 6 #include "down_service_mgr.h"
 7 
 8 #include <zookeeper.jute.h>
 9 
10 class CDownNode
11 {
12 public:
13     CDownNode() { Reset(); }
14     ~CDownNode() {}
15 
16     void Reset()
17     {
18         m_full_node = false;
19         m_node_info.Reset();
20         m_node_list.clear();
21         m_invalid_node_path_list.clear();
22     }
23 
24 public:
25     bool m_full_node;
26     CNodeInfo m_node_info;
27 
28     map<string, CNodeInfo> m_node_list;
29     set<string> m_invalid_node_path_list;
30 
31 };
32 
33 class CDiscovery : public CUnCopyable
34 {
35 private:
36     static pthread_mutex_t m_mutex;
37     static CDiscovery* m_pins;
38     CDiscovery();
39 public:
40     static CDiscovery* GetInstance();
41     int Init(const set<string> & down_path_list, const set<int> & down_service_list);
42 
43 
44 public:
45     int StartCheck();
46     int Stop();
47     static void OnZkHandleResetFunc();
48 
49 private:
50     static void* DiscoveryCheckThread(void * param);
51     int DiscoveryCheck();
52     int DownPathCheck();
53     int InvalidNodeCheck();
54     int DebugPrintAllNode();
55     bool IsRunning();
56 
57 private:
58     static void ZkPathWatcher(zhandle_t* zh, int type, int state, const char* path, void* watcherCtx);
59     static void ZkNodeWatcher(zhandle_t* zh, int type, int state, const char* path, void* watcherCtx);
60 
61 private:
62     int OnPathChange(string path);
63     int OnNodeChange(string node);
64     void OnZkHandleReset();
65     
66 private:
67     pthread_t m_down_check_thread_id;
68     bool m_is_running;
69     map<string, CDownNode*> m_down_path_list; // <zk_path, down_node_info>
70     map<string, string> m_down_path_2_dir;    // <zk_node, zk_node>
71     map<int, CDownServiceMgr* > m_down_service_list;
72 };
73 
74 #endif

 

discovery.cpp

  1 #include "discovery.h"
  2 #include "auto_lock.h"
  3 
  4 #include <stdlib.h>
  5 #include <sys/prctl.h>
  6 #include <unistd.h>
  7 #include <pthread.h>
  8 
  9 CDiscovery* CDiscovery::m_pins = nullptr;
 10 pthread_mutex_t CDiscovery::m_mutex;
 11 
 12 CDiscovery::CDiscovery()
 13     : m_is_running(false)
 14     , m_down_check_thread_id(0)
 15 {
 16 }
 17 
 18 CDiscovery* CDiscovery::GetInstance()
 19 {
 20     if (m_pins == nullptr)
 21     {
 22         CAutoMutexLock auto_lock(m_mutex);
 23         if (m_pins == nullptr)
 24         {
 25             m_pins = new CDiscovery;
 26         }
 27     }
 28     return m_pins;
 29 }
 30 
 31 int CDiscovery::Init(const set<string>& down_path_list, const set<int>& down_service_list)
 32 {
 33     CAutoMutexLock auto_lock(m_mutex);
 34     for (const auto & zk_path : down_path_list)
 35     {
 36         m_down_path_list[zk_path] = new CDownNode;
 37     }
 38 
 39     for (const auto& module_id : down_service_list)
 40     {
 41         m_down_service_list[module_id] = new CDownServiceMgr(module_id);
 42     }
 43 
 44     return 0;
 45 }
 46 
 47 int CDiscovery::StartCheck()
 48 {
 49     if (0 == m_down_check_thread_id)
 50     {
 51         m_is_running = true;
 52         if (0 != pthread_create(&m_down_check_thread_id, nullptr, CDiscovery::DiscoveryCheckThread, nullptr))
 53         {
 54             printf("CDiscovery::StartCheck create discovery check thread fail.");
 55             return -1;
 56         }
 57         printf("CDiscovery::StartCheck create discovery check thread succ.");
 58     }
 59     return 0;
 60 }
 61 
 62 int CDiscovery::Stop()
 63 {
 64     m_is_running = false;
 65 }
 66 
 67 void CDiscovery::OnZkHandleResetFunc()
 68 {
 69     CDiscovery::GetInstance()->OnZkHandleReset();
 70 }
 71 
 72 void* CDiscovery::DiscoveryCheckThread(void* param)
 73 {
 74     prctl(PR_SET_NAME, "zk_discovery_check");
 75 
 76     CZkHandle::GetInstance()->AddResetHandleFn("discovery", CDiscovery::OnZkHandleResetFunc);
 77 
 78     while (true == CDiscovery::GetInstance()->IsRunning())
 79     {
 80         CDiscovery::GetInstance()->DiscoveryCheck();
 81         usleep(kZkDiscoveryIntervalTime);
 82     }
 83     return nullptr;
 84 }
 85 
 86 int CDiscovery::DiscoveryCheck()
 87 {
 88     DownPathCheck();
 89     InvalidNodeCheck();
 90 
 91 #ifdef _DEBUG_
 92     DebugPrintAllNode();
 93 #endif
 94 }
 95 
 96 int CDiscovery::DownPathCheck()
 97 {
 98     printf("%s =======================================================\\n", __func__);
 99     CAutoMutexLock auto_lock(m_mutex);
100     for (const auto& down_path : m_down_path_list)
101     {
102         CDownNode* down_node = down_path.second;
103         if (down_node->m_full_node)
104         {
105             continue;
106         }
107         set<string> node_list;
108         if (ZOK == CZkHandle::GetInstance()->ZkWgetChildren(down_path.first, CDiscovery::ZkPathWatcher, node_list))
109         {
110             for (auto node_path : node_list)
111             {
112                 down_node->m_invalid_node_path_list.insert(down_path.first + \'/\' + node_path);
113             }
114             for (const auto& node : down_node->m_node_list)
115             {
116                 down_node->m_invalid_node_path_list.insert(node.first);
117             }
118 
119             down_node->m_full_node = true;
120         }
121     }
122     return 0;
123 }
124 
125 int CDiscovery::InvalidNodeCheck()
126 {
127     printf("%s =======================================================\\n", __func__);
128     CAutoMutexLock auto_lock(m_mutex);
129     for (const auto& down_path : m_down_path_list)
130     {
131         CDownNode* down_node = down_path.second;
132         for (auto it_node_path = down_node->m_invalid_node_path_list.begin(); it_node_path != down_node->m_invalid_node_path_list.end();)
133         {
134             struct Stat stat;
135             string zk_node_info;
136             int ret_code = CZkHandle::GetInstance()->ZkWGetNodeInfo(*it_node_path, CDiscovery::ZkNodeWatcher, zk_node_info, stat);
137             if (ZOK == ret_code)
138             {
139                 CNodeInfo node_info;
140                 node_info.FromString(zk_node_info);
141                 down_node->m_node_list[*it_node_path] = node_info;
142                 it_node_path = down_node->m_invalid_node_path_list.erase(it_node_path);
143                 m_down_service_list[atoi(node_info.m_module_id.c_str())]->Register(*it_node_path, &node_info);
144                 m_down_path_2_dir[*it_node_path] = down_path.first;
145             }
146             else if(ZNONODE == ret_code)
147             {
148                 if (down_node->m_node_list.count(*it_node_path))
149                 {
150                     CNodeInfo& node_info = down_node->m_node_list[*it_node_path];
151                     m_down_service_list[atoi(node_info.m_module_id.c_str())]->UnRegister(*it_node_path);
152                     down_node->m_node_list.erase(*it_node_path);
153                 }
154                 if (m_down_path_2_dir.count(*it_node_path))
155                 {
156                     m_down_path_2_dir.erase(*it_node_path);
157                 }
158                 it_node_path = down_node->m_invalid_node_path_list.erase(it_node_path);
159             }
160             else
161             {
162                 ++it_node_path;
163             }
164         }
165     }
166     return 0;
167 }
168 
169 int CDiscovery::DebugPrintAllNode()
170 {
171     printf("%s =======================================================\\n", __func__);
172     CAutoMutexLock auto_lock(m_mutex);
173     for (const auto& down_path : m_down_path_list)
174     {
175         printf("%s down_path=%s is_full_node=%d --------------------------------------------\\n", __func__, down_path.first.c_str(), down_path.second->m_full_node);
176         printf("%s node_list: \\n", __func__);
177         for (auto& down_node : down_path.second->m_node_list)
178         {
179             printf("%s node=%s \\n", __func__, down_node.first.c_str());
180             printf("%s info=%s \\n", __func__, down_node.second.ToString().c_str());
181         }
182 
183         printf("%s invalid_node_list: \\n", __func__);
184         for (const auto& invalid_node : down_path.second->m_invalid_node_path_list)
185         {
186             printf("%s invalid_node:%s \\n", __func__, invalid_node.c_str());
187         }
188     }
189     return 0;
190 }
191 
192 bool CDiscovery::IsRunning()
193 {
194     return m_is_running;
195 }
196 
197 void CDiscovery::ZkPathWatcher(zhandle_t* zh, int type, int state, const char* path, void* watcherCtx)
198 {
199     if (ZOO_CHILD_EVENT == type)
200     {
201         CDiscovery::GetInstance()->OnPathChange(path);
202     }
203 }
204 
205 void CDiscovery::ZkNodeWatcher(zhandle_t* zh, int type, int state, const char* path, void* watcherCtx)
206 {
207     if (ZOO_CHANGED_EVENT == type)
208     {
209         CDiscovery::GetInstance()->OnNodeChange(path);
210     }
211     else if (ZOO_DELETED_EVENT == type)
212     {
213         CDiscovery::GetInstance()->OnNodeChange(path);
214     }
215 }
216 
217 int CDiscovery::OnPathChange(string path)
218 {
219     printf("%s path=%s  =======================================================\\n", __func__, path.c_str());
220     CAutoMutexLock auto_lock(m_mutex);
221     if (m_down_path_list.count(path))
222     {
223         m_down_path_list[path]->m_full_node = false;
224     }
225     return 0;
226 }
227 
228 int CDiscovery::OnNodeChange(string node)
229 {
230     printf("%s node=%s  =======================================================\\n", __func__, node.c_str());
231     CAutoMutexLock auto_lock(m_mutex);
232     string path;
233     if (m_down_path_2_dir.count(node))
234     {
235         path = m_down_path_2_dir[node];
236     }
237     if (m_down_path_list.count(path))
238     {
239         m_down_path_list[path]->m_invalid_node_path_list.insert(node);
240     }
241     return 0;
242 }
243 
244 void CDiscovery::OnZkHandleReset()
245 {
246     printf("%s =======================================================\\n", __func__);
247     CAutoMutexLock auto_lock(m_mutex);
248     for (const auto& down_path : m_down_path_list)
249     {
250         down_path.second->m_full_node = false;
251     }
252 }
View Code

 

discovery_test main.cpp

 1 #include "../zk_util/zk_handle.h"
 2 #include "../zk_util/discovery.h"
 3 
 4 #include <unistd.h>
 5 
 6 //伪分布式部署 host list最好以配置文件形式,此处为测试程序,暂时写死
 7 const char* host_list = "xx.xx.xx.xx:port,xx.xx.xx.xx:port,xx.xx.xx.xx:port";
 8 const int time_out = 3000;
 9 int main()
10 {
11     CZkHandle::GetInstance()->ZkInit(host_list, time_out);
12 
13     set<string> down_path_list;
14     down_path_list.insert("/zk_test1");
15     down_path_list.insert("/zk_test2");
16 
17     set<int> down_service_list;
18     down_service_list.insert(1);
19     down_service_list.insert(2);
20 
21     CDiscovery::GetInstance()->Init(down_path_list, down_service_list);
22     CDiscovery::GetInstance()->StartCheck();
23 
24     sleep(60);
25 
26     return 0;
27 }

 

Makefile

 1 INC_DIR:= ./ ../zk_util/ /usr/local/include/zookeeper/ /usr/local/include/json/
 2 SRCS:= $(wildcard ./*cpp ../zk_util/*cpp)
 3 OBJS:= $(patsubst %.cpp, %.o, $(SRCS))
 4 LIBS:= -lpthread -lzookeeper_mt -ljsoncpp
 5 
 6 CXX:= g++
 7 
 8 CXXFLAGS:= -w -g -std=c++11 $(addprefix -I, $(INC_DIR)) $(LIBS) -Wl,-rpath="/usr/local/lib" -D _DEBUG_
 9 
10 EXE:= ../../bin/discovery_test
11 
12 $(EXE):$(OBJS)
13     $(CXX) -o $(EXE) $(OBJS) $(CXXFLAGS)
14 
15 clean:
16     rm -rf $(EXE)
RPC ---- 基于ZooKeeper为注册中心实现的RPC

RPC ---- 基于ZooKeeper为注册中心实现的RPC

RPC ---- 基于ZooKeeper为注册中心实现的RPC

GO使用Zookeeper实现服务发现

服务注册与发现(中)

Eureka 服务注册与发现