基于ZooKeeper实现服务注册与发现
Posted .番茄炒蛋
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了基于ZooKeeper实现服务注册与发现相关的知识,希望对你有一定的参考价值。
简介
在分布式系统中,服务注册与发现是一项重要的技术,本文提供Java代码基于ZooKeeper来实现的服务注册与发现的功能.
服务注册
package com.fanqiechaodan.service;
import org.apache.zookeeper.*;
import java.io.IOException;
/**
* @author fanqiechaodan
* @Classname ServiceRegistry
* @Description 服务注册
*/
public class ServiceRegistry
private static final String SERVICE_REGISTRY_ROOT = "/services";
private static final int SESSION_TIMEOUT = 5000;
private ZooKeeper zooKeeper;
public ServiceRegistry(String zooKeeperAddress) throws IOException
this.zooKeeper = new ZooKeeper(zooKeeperAddress, SESSION_TIMEOUT, null);
/**
* 注册服务
*
* @param serviceName
* @param serviceAddress
* @throws KeeperException
* @throws InterruptedException
*/
public void registryService(String serviceName, String serviceAddress) throws KeeperException, InterruptedException
String servicePath = SERVICE_REGISTRY_ROOT + "/" + serviceName;
// 如果根节点不存在就创建
if (zooKeeper.exists(SERVICE_REGISTRY_ROOT, false) == null)
zooKeeper.create(SERVICE_REGISTRY_ROOT, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
String serviceNodePath = zooKeeper.create(servicePath, serviceAddress.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println("注册成功;serviceNodePath:" + serviceNodePath);
public void close() throws InterruptedException
zooKeeper.close();
服务消费
package com.fanqiechaodan.service;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import java.io.IOException;
import java.util.List;
/**
* @author fanqiechaodan
* @Classname ServiceConsumer
* @Description
*/
public class ServiceConsumer
private static final String SERVICE_REGISTRY_ROOT = "/services";
private static final int SESSION_TIMEOUT = 5000;
private ZooKeeper zooKeeper;
public ServiceConsumer(String zooKeeperAddress) throws IOException
this.zooKeeper = new ZooKeeper(zooKeeperAddress, SESSION_TIMEOUT, null);
/**
* 获取服务地址
*
* @param serviceName
* @return
* @throws KeeperException
* @throws InterruptedException
*/
public String consumeService(String serviceName) throws KeeperException, InterruptedException
String servicePath = SERVICE_REGISTRY_ROOT;
List<String> childrenList = zooKeeper.getChildren(servicePath, watchedEvent ->
if (watchedEvent.getType() == Watcher.Event.EventType.NodeChildrenChanged)
System.out.println("serviceNodePath:" + watchedEvent.getPath());
// synchronized为了保证线程安全,确保notifyAll()调用之前不会有人修改ServiceConsumer
// 使用notifyAll()唤醒左右等待中的线程,让他们同时重新获取服务列表,避免单个线程多次获取同一服务节点的情况
synchronized (this)
notifyAll();
);
String serviceNode = childrenList.get((int) (Math.random() * childrenList.size()));
String res = new String(zooKeeper.getData(servicePath + "/" + serviceNode, false, null));
System.out.println("serviceNode:" + serviceNode);
System.out.println("serviceAddress:" + res);
return res;
public void close() throws InterruptedException
zooKeeper.close();
测试
package com.fanqiechaodan;
import com.fanqiechaodan.service.ServiceConsumer;
import com.fanqiechaodan.service.ServiceRegistry;
/**
* @author fanqiechaodan
* @Classname Demo
* @Description
*/
public class Demo
private static final String ZOOKEEPER_ADDRESS = "192.168.56.129:2181";
private static final String SERVICE_NAME = "fanqiechaodan";
public static void main(String[] args) throws Exception
// 创建服务注册对象
ServiceRegistry serviceRegistry = new ServiceRegistry(ZOOKEEPER_ADDRESS);
// 启动线程1,进行服务的注册
Thread thread1 = new Thread(() ->
try
serviceRegistry.registryService(SERVICE_NAME, "127.0.0.1:8080");
catch (Exception e)
e.printStackTrace();
);
thread1.start();
// 让线程1先执行一会儿,以确保服务已经被注册
Thread.sleep(3000);
// 创建服务消费对象
ServiceConsumer serviceConsumer = new ServiceConsumer(ZOOKEEPER_ADDRESS);
// 启动线程2,进行服务的消费
Thread thread2 = new Thread(() ->
try
String serviceAddress = serviceConsumer.consumeService(SERVICE_NAME);
System.out.println("Service consumed: " + serviceAddress);
catch (Exception e)
e.printStackTrace();
);
thread2.start();
// 等待线程2执行完成
thread2.join();
// 睡眠30秒查看ZooKeeper节点是否存在
Thread.sleep(30000);
// 关闭服务注册和服务消费对象
serviceRegistry.close();
serviceConsumer.close();
总结
上面Java代码实现了服务注册和消费的基本流程,首先服务提供方通过ZooKeeper将服务节点注册到服务注册中心,然后再服务消费方通过ZooKeeper获取服务节点列表,并从中随机的选择一个服务节点进行调用,再实际应用中,还需要对服务节点进行心跳检测,负载均衡等处理,以保证服务的高可用性和稳定性.
zookeeper 实现一个简单的服务注册与发现(C++) 三:服务发现
git:git@github.com:ccx19930930/services_register_and_discovery.git
参考链接:https://www.cnblogs.com/haippy/archive/2013/02/21/2920280.html
down_service_mgr.h
1 #ifndef _DOWN_SERVICE_MGR_H_ 2 #define _DOWN_SERVICE_MGR_H_ 3 4 #include "base_class.h" 5 6 class CDownServiceMgr : CUnCopyable 7 { 8 public: 9 CDownServiceMgr(int module_id) : m_module_id(module_id) {} 10 ~CDownServiceMgr() {} 11 private: 12 CDownServiceMgr() {} 13 14 public: 15 int Register(const string& zk_path, CNodeInfo* node_info); 16 int UnRegister(const string& zk_path); 17 18 private: 19 int m_module_id; 20 map<string, CNodeInfo *> m_node_list; 21 pthread_mutex_t m_mutex; 22 }; 23 24 #endif
down_service_mgr.cpp
1 #include "down_service_mgr.h" 2 #include "auto_lock.h" 3 4 int CDownServiceMgr::Register(const string& zk_path, CNodeInfo* node_info) 5 { 6 CAutoMutexLock auto_lock(m_mutex); 7 if (m_node_list.count(zk_path)) 8 { 9 return -1; 10 } 11 m_node_list[zk_path] = node_info; 12 //TODO 长连接等 13 14 return 0; 15 } 16 17 int CDownServiceMgr::UnRegister(const string& zk_path) 18 { 19 CAutoMutexLock auto_lock(m_mutex); 20 if (m_node_list.count(zk_path) == 0) 21 { 22 return -1; 23 } 24 //TODO 长连接等 25 26 m_node_list.erase(zk_path); 27 return 0; 28 }
discovery.h
1 #ifndef _DISCOVERY_H_ 2 #define _DISCOVERY_H_ 3 4 #include "base_class.h" 5 #include "zk_handle.h" 6 #include "down_service_mgr.h" 7 8 #include <zookeeper.jute.h> 9 10 class CDownNode 11 { 12 public: 13 CDownNode() { Reset(); } 14 ~CDownNode() {} 15 16 void Reset() 17 { 18 m_full_node = false; 19 m_node_info.Reset(); 20 m_node_list.clear(); 21 m_invalid_node_path_list.clear(); 22 } 23 24 public: 25 bool m_full_node; 26 CNodeInfo m_node_info; 27 28 map<string, CNodeInfo> m_node_list; 29 set<string> m_invalid_node_path_list; 30 31 }; 32 33 class CDiscovery : public CUnCopyable 34 { 35 private: 36 static pthread_mutex_t m_mutex; 37 static CDiscovery* m_pins; 38 CDiscovery(); 39 public: 40 static CDiscovery* GetInstance(); 41 int Init(const set<string> & down_path_list, const set<int> & down_service_list); 42 43 44 public: 45 int StartCheck(); 46 int Stop(); 47 static void OnZkHandleResetFunc(); 48 49 private: 50 static void* DiscoveryCheckThread(void * param); 51 int DiscoveryCheck(); 52 int DownPathCheck(); 53 int InvalidNodeCheck(); 54 int DebugPrintAllNode(); 55 bool IsRunning(); 56 57 private: 58 static void ZkPathWatcher(zhandle_t* zh, int type, int state, const char* path, void* watcherCtx); 59 static void ZkNodeWatcher(zhandle_t* zh, int type, int state, const char* path, void* watcherCtx); 60 61 private: 62 int OnPathChange(string path); 63 int OnNodeChange(string node); 64 void OnZkHandleReset(); 65 66 private: 67 pthread_t m_down_check_thread_id; 68 bool m_is_running; 69 map<string, CDownNode*> m_down_path_list; // <zk_path, down_node_info> 70 map<string, string> m_down_path_2_dir; // <zk_node, zk_node> 71 map<int, CDownServiceMgr* > m_down_service_list; 72 }; 73 74 #endif
discovery.cpp
1 #include "discovery.h" 2 #include "auto_lock.h" 3 4 #include <stdlib.h> 5 #include <sys/prctl.h> 6 #include <unistd.h> 7 #include <pthread.h> 8 9 CDiscovery* CDiscovery::m_pins = nullptr; 10 pthread_mutex_t CDiscovery::m_mutex; 11 12 CDiscovery::CDiscovery() 13 : m_is_running(false) 14 , m_down_check_thread_id(0) 15 { 16 } 17 18 CDiscovery* CDiscovery::GetInstance() 19 { 20 if (m_pins == nullptr) 21 { 22 CAutoMutexLock auto_lock(m_mutex); 23 if (m_pins == nullptr) 24 { 25 m_pins = new CDiscovery; 26 } 27 } 28 return m_pins; 29 } 30 31 int CDiscovery::Init(const set<string>& down_path_list, const set<int>& down_service_list) 32 { 33 CAutoMutexLock auto_lock(m_mutex); 34 for (const auto & zk_path : down_path_list) 35 { 36 m_down_path_list[zk_path] = new CDownNode; 37 } 38 39 for (const auto& module_id : down_service_list) 40 { 41 m_down_service_list[module_id] = new CDownServiceMgr(module_id); 42 } 43 44 return 0; 45 } 46 47 int CDiscovery::StartCheck() 48 { 49 if (0 == m_down_check_thread_id) 50 { 51 m_is_running = true; 52 if (0 != pthread_create(&m_down_check_thread_id, nullptr, CDiscovery::DiscoveryCheckThread, nullptr)) 53 { 54 printf("CDiscovery::StartCheck create discovery check thread fail."); 55 return -1; 56 } 57 printf("CDiscovery::StartCheck create discovery check thread succ."); 58 } 59 return 0; 60 } 61 62 int CDiscovery::Stop() 63 { 64 m_is_running = false; 65 } 66 67 void CDiscovery::OnZkHandleResetFunc() 68 { 69 CDiscovery::GetInstance()->OnZkHandleReset(); 70 } 71 72 void* CDiscovery::DiscoveryCheckThread(void* param) 73 { 74 prctl(PR_SET_NAME, "zk_discovery_check"); 75 76 CZkHandle::GetInstance()->AddResetHandleFn("discovery", CDiscovery::OnZkHandleResetFunc); 77 78 while (true == CDiscovery::GetInstance()->IsRunning()) 79 { 80 CDiscovery::GetInstance()->DiscoveryCheck(); 81 usleep(kZkDiscoveryIntervalTime); 82 } 83 return nullptr; 84 } 85 86 int CDiscovery::DiscoveryCheck() 87 { 88 DownPathCheck(); 89 InvalidNodeCheck(); 90 91 #ifdef _DEBUG_ 92 DebugPrintAllNode(); 93 #endif 94 } 95 96 int CDiscovery::DownPathCheck() 97 { 98 printf("%s =======================================================\\n", __func__); 99 CAutoMutexLock auto_lock(m_mutex); 100 for (const auto& down_path : m_down_path_list) 101 { 102 CDownNode* down_node = down_path.second; 103 if (down_node->m_full_node) 104 { 105 continue; 106 } 107 set<string> node_list; 108 if (ZOK == CZkHandle::GetInstance()->ZkWgetChildren(down_path.first, CDiscovery::ZkPathWatcher, node_list)) 109 { 110 for (auto node_path : node_list) 111 { 112 down_node->m_invalid_node_path_list.insert(down_path.first + \'/\' + node_path); 113 } 114 for (const auto& node : down_node->m_node_list) 115 { 116 down_node->m_invalid_node_path_list.insert(node.first); 117 } 118 119 down_node->m_full_node = true; 120 } 121 } 122 return 0; 123 } 124 125 int CDiscovery::InvalidNodeCheck() 126 { 127 printf("%s =======================================================\\n", __func__); 128 CAutoMutexLock auto_lock(m_mutex); 129 for (const auto& down_path : m_down_path_list) 130 { 131 CDownNode* down_node = down_path.second; 132 for (auto it_node_path = down_node->m_invalid_node_path_list.begin(); it_node_path != down_node->m_invalid_node_path_list.end();) 133 { 134 struct Stat stat; 135 string zk_node_info; 136 int ret_code = CZkHandle::GetInstance()->ZkWGetNodeInfo(*it_node_path, CDiscovery::ZkNodeWatcher, zk_node_info, stat); 137 if (ZOK == ret_code) 138 { 139 CNodeInfo node_info; 140 node_info.FromString(zk_node_info); 141 down_node->m_node_list[*it_node_path] = node_info; 142 it_node_path = down_node->m_invalid_node_path_list.erase(it_node_path); 143 m_down_service_list[atoi(node_info.m_module_id.c_str())]->Register(*it_node_path, &node_info); 144 m_down_path_2_dir[*it_node_path] = down_path.first; 145 } 146 else if(ZNONODE == ret_code) 147 { 148 if (down_node->m_node_list.count(*it_node_path)) 149 { 150 CNodeInfo& node_info = down_node->m_node_list[*it_node_path]; 151 m_down_service_list[atoi(node_info.m_module_id.c_str())]->UnRegister(*it_node_path); 152 down_node->m_node_list.erase(*it_node_path); 153 } 154 if (m_down_path_2_dir.count(*it_node_path)) 155 { 156 m_down_path_2_dir.erase(*it_node_path); 157 } 158 it_node_path = down_node->m_invalid_node_path_list.erase(it_node_path); 159 } 160 else 161 { 162 ++it_node_path; 163 } 164 } 165 } 166 return 0; 167 } 168 169 int CDiscovery::DebugPrintAllNode() 170 { 171 printf("%s =======================================================\\n", __func__); 172 CAutoMutexLock auto_lock(m_mutex); 173 for (const auto& down_path : m_down_path_list) 174 { 175 printf("%s down_path=%s is_full_node=%d --------------------------------------------\\n", __func__, down_path.first.c_str(), down_path.second->m_full_node); 176 printf("%s node_list: \\n", __func__); 177 for (auto& down_node : down_path.second->m_node_list) 178 { 179 printf("%s node=%s \\n", __func__, down_node.first.c_str()); 180 printf("%s info=%s \\n", __func__, down_node.second.ToString().c_str()); 181 } 182 183 printf("%s invalid_node_list: \\n", __func__); 184 for (const auto& invalid_node : down_path.second->m_invalid_node_path_list) 185 { 186 printf("%s invalid_node:%s \\n", __func__, invalid_node.c_str()); 187 } 188 } 189 return 0; 190 } 191 192 bool CDiscovery::IsRunning() 193 { 194 return m_is_running; 195 } 196 197 void CDiscovery::ZkPathWatcher(zhandle_t* zh, int type, int state, const char* path, void* watcherCtx) 198 { 199 if (ZOO_CHILD_EVENT == type) 200 { 201 CDiscovery::GetInstance()->OnPathChange(path); 202 } 203 } 204 205 void CDiscovery::ZkNodeWatcher(zhandle_t* zh, int type, int state, const char* path, void* watcherCtx) 206 { 207 if (ZOO_CHANGED_EVENT == type) 208 { 209 CDiscovery::GetInstance()->OnNodeChange(path); 210 } 211 else if (ZOO_DELETED_EVENT == type) 212 { 213 CDiscovery::GetInstance()->OnNodeChange(path); 214 } 215 } 216 217 int CDiscovery::OnPathChange(string path) 218 { 219 printf("%s path=%s =======================================================\\n", __func__, path.c_str()); 220 CAutoMutexLock auto_lock(m_mutex); 221 if (m_down_path_list.count(path)) 222 { 223 m_down_path_list[path]->m_full_node = false; 224 } 225 return 0; 226 } 227 228 int CDiscovery::OnNodeChange(string node) 229 { 230 printf("%s node=%s =======================================================\\n", __func__, node.c_str()); 231 CAutoMutexLock auto_lock(m_mutex); 232 string path; 233 if (m_down_path_2_dir.count(node)) 234 { 235 path = m_down_path_2_dir[node]; 236 } 237 if (m_down_path_list.count(path)) 238 { 239 m_down_path_list[path]->m_invalid_node_path_list.insert(node); 240 } 241 return 0; 242 } 243 244 void CDiscovery::OnZkHandleReset() 245 { 246 printf("%s =======================================================\\n", __func__); 247 CAutoMutexLock auto_lock(m_mutex); 248 for (const auto& down_path : m_down_path_list) 249 { 250 down_path.second->m_full_node = false; 251 } 252 }
discovery_test main.cpp
1 #include "../zk_util/zk_handle.h" 2 #include "../zk_util/discovery.h" 3 4 #include <unistd.h> 5 6 //伪分布式部署 host list最好以配置文件形式,此处为测试程序,暂时写死 7 const char* host_list = "xx.xx.xx.xx:port,xx.xx.xx.xx:port,xx.xx.xx.xx:port"; 8 const int time_out = 3000; 9 int main() 10 { 11 CZkHandle::GetInstance()->ZkInit(host_list, time_out); 12 13 set<string> down_path_list; 14 down_path_list.insert("/zk_test1"); 15 down_path_list.insert("/zk_test2"); 16 17 set<int> down_service_list; 18 down_service_list.insert(1); 19 down_service_list.insert(2); 20 21 CDiscovery::GetInstance()->Init(down_path_list, down_service_list); 22 CDiscovery::GetInstance()->StartCheck(); 23 24 sleep(60); 25 26 return 0; 27 }
Makefile
1 INC_DIR:= ./ ../zk_util/ /usr/local/include/zookeeper/ /usr/local/include/json/ 2 SRCS:= $(wildcard ./*cpp ../zk_util/*cpp) 3 OBJS:= $(patsubst %.cpp, %.o, $(SRCS)) 4 LIBS:= -lpthread -lzookeeper_mt -ljsoncpp 5 6 CXX:= g++ 7 8 CXXFLAGS:= -w -g -std=c++11 $(addprefix -I, $(INC_DIR)) $(LIBS) -Wl,-rpath="/usr/local/lib" -D _DEBUG_ 9 10 EXE:= ../../bin/discovery_test 11 12 $(EXE):$(OBJS) 13 $(CXX) -o $(EXE) $(OBJS) $(CXXFLAGS) 14 15 clean: 16 rm -rf $(EXE) RPC ---- 基于ZooKeeper为注册中心实现的RPCRPC ---- 基于ZooKeeper为注册中心实现的RPC