Dubbo之ZookeeperRegistry源码分析
Posted Java后端笔记
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Dubbo之ZookeeperRegistry源码分析相关的知识,希望对你有一定的参考价值。
ZookeeperRegistry的作用
ZookeeperRegistry是dubbo中常用的注册中心实现,它主要作用通过Zookeeper的目录监听机制,让消费者能够实时得到在线的提供者列表。并且一些服务治理的功能也是通过zookeeper这个监听特性巧妙的完成。
在具体讲解ZookeeperRegistry的相关源码之前,先来分析下dubbo在zookeeper的目录结构以及dubbo如何利用这个特性
Zookeeper目录结构
dubbo在zookeeper建立的目录是基于接口的,大致如下针对每个接口节点会存在以下4个子节点
节点名 | 作用 | 子节点是否持久节点 |
---|---|---|
consumers | 存储消费者节点url | 是 |
configuators | 存储override或者absent url,用于服务治理 | 否 |
routers | 用于设置路由url,用于服务治理 | 否 |
providers | 存储在线提供者url | 否 |
consumer节点存在的意义并不大,主要还是为了做监控 其他三个节点,都会设置被相应的监听器,发生改变时,会触发特定事件
Dubbo对Zookeeper监听机制的利用
Dubbo中通过ZookeeperClient的实现类来对zookeeper进行操作ZookeeperClient提供设置两种监听器的方法,对应子节点监听器和状态监听器,这里我们关注子节点监听器ChildListener
public interface ChildListener {
/**
*
* @param path 监听的节点
* @param children 监听的节点的所有子节点
*/
void childChanged(String path, List<String> children);
}
ZookeeperClient有两种实现,第一种通过官方提供的jar包,第二个通过Apache的Curator框架,默认使用第二种,我们讲解的也是Curator的对应实现 添加子节点监听器的方法为addChildListener
public List<String> addChildListener(String path, final ChildListener listener) {
//对listener做缓存,因为ChildListener是dubbo提供的监听器接口,需要转换为cruator的监听器接口
ConcurrentMap<ChildListener, TargetChildListener> listeners = childListeners.get(path);
if (listeners == null) {
childListeners.putIfAbsent(path, new ConcurrentHashMap<ChildListener, TargetChildListener>());
listeners = childListeners.get(path);
}
TargetChildListener targetListener = listeners.get(listener);
if (targetListener == null) {
//createTargetChildListener会对监听器进行转换
listeners.putIfAbsent(listener, createTargetChildListener(path, listener));
targetListener = listeners.get(listener);
}
return addTargetChildListener(path, targetListener);
}
Dubbo底层封装了2套Zookeeper API,所以通过ChildListener抽象了监听器,但是在实际调用时会通过createTargetChildListener转为对应框架的监听器实现 addTargetChildListener方法在添加监听器之后会返回监听path当前的所有的子节点
public List<String> addTargetChildListener(String path, CuratorWatcher listener) {
try {
//添加监听,并且返回这个目录当前所有子节点
//这种监听方式是一次性的,在listener实现中会再次执行监听逻辑
return client.getChildren().usingWatcher(listener).forPath(path);
} catch (NoNodeException e) {
return null;
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
上述代码需要注意监听是一次性的,其实curator提供了TreeCache用作永久性的监听,这边不用到这个特性,应该是为了和官方API保持一致吧。 接下去看下Cruator监听器的封装
public CuratorWatcher createTargetChildListener(String path, ChildListener listener) {
return new CuratorWatcherImpl(listener);
}
private class CuratorWatcherImpl implements CuratorWatcher {
private volatile ChildListener listener;
public CuratorWatcherImpl(ChildListener listener) {
this.listener = listener;
}
public void unwatch() {
this.listener = null;
}
@Override
public void process(WatchedEvent event) throws Exception {
if (listener != null) {
String path = event.getPath() == null ? "" : event.getPath();
listener.childChanged(path,
// if path is null, curator using watcher will throw NullPointerException.
// if client connect or disconnect to server, zookeeper will queue
// watched event(Watcher.Event.EventType.None, .., path = null).
StringUtils.isNotEmpty(path)
//再次设置监听,并且把监听path的所有子节点传入childChanged方法
? client.getChildren().usingWatcher(this).forPath(path)
: Collections.<String>emptyList());
}
}
}
可以看到listener的触发逻辑以及入参来源
源码分析
通过ZookeeperRegistry的类继承图,逐上而下的分析源码
Registry接口
public interface Registry extends Node, RegistryService {
}
Registry继承Node和RegistryService两个接口,本身不提供接口方法
public interface Node {
/**
* get url.
*
* @return url.
*/
URL getUrl();
/**
* is available.
*
* @return available.
*/
boolean isAvailable();
/**
* destroy.
*/
void destroy();
}
Node约束了三个生命周期相关的方法 getUrl用于获取当前组件的url配置 isAvailable检测组件是否可用 destroy用于销毁组件
public interface RegistryService {
void register(URL url);
void unregister(URL url);
void subscribe(URL url, NotifyListener listener);
void unsubscribe(URL url, NotifyListener listener);
List<URL> lookup(URL url);
}
RegistryService规定了和注册中心相关的方法 register和unregister用于提供者向注册中心注册提供者url subscribe和unsubscribe用于消费者向对应接口目录注册监听 lookup用于查找查找url,通过消费者url查找提供者url以及服务治理有关的url
AbstractRegistry
主要提供接口提供者本地缓存功能 以及基础register,unregister,subscribe,unsubscribe,notify,lookup,recover逻辑
register,unregister会(接触)注册提供者url,主要操作
private final Set<URL> registered = new ConcurrentHashSet<URL>();
subscribe,unsubscribe则会针对特定url提供监听,主要操作
private final ConcurrentMap<URL, Set<NotifyListener>> subscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>();
notify方法会缓存最近通知的url到notified以及触发listener回调
/**
* 这个方法不会直接触发,被FailbackRegistry重载
* FailbackRegistry增加failback逻辑后,还是会调用这个方法
* @param url
* @param listener
* @param urls
*/
protected void notify(URL url, NotifyListener listener, List<URL> urls) {
if (url == null) {
throw new IllegalArgumentException("notify url == null");
}
if (listener == null) {
throw new IllegalArgumentException("notify listener == null");
}
if ((urls == null || urls.isEmpty())
&& !Constants.ANY_VALUE.equals(url.getServiceInterface())) {
logger.warn("Ignore empty notify urls for subscribe url " + url);
return;
}
if (logger.isInfoEnabled()) {
logger.info("Notify urls for subscribe url " + url + ", urls: " + urls);
}
Map<String, List<URL>> result = new HashMap<String, List<URL>>();
//根据url的category进行分类
for (URL u : urls) {
if (UrlUtils.isMatch(url, u)) {
String category = u.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
List<URL> categoryList = result.get(category);
if (categoryList == null) {
categoryList = new ArrayList<URL>();
result.put(category, categoryList);
}
categoryList.add(u);
}
}
if (result.size() == 0) {
return;
}
//下面操作notified缓存
Map<String, List<URL>> categoryNotified = notified.get(url);
if (categoryNotified == null) {
notified.putIfAbsent(url, new ConcurrentHashMap<String, List<URL>>());
categoryNotified = notified.get(url);
}
for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
String category = entry.getKey();
List<URL> categoryList = entry.getValue();
//对notified内容进行覆盖,相当于会保存上一次的通知
categoryNotified.put(category, categoryList);
//每次通知后会刷新本地缓存
saveProperties(url);
//进行listener回调,每种category的url分别回调一次
listener.notify(categoryList);
}
}
这个类的recover方法不分析,因为FailbackRegistry完全重写了这个方法
FailbackRegistry
FailbackRegistry重载了AbstractRegistry中的subscribe,unsubscribe,register,unregister,notify方法,在AbstractRegistry的基础上提供了失败重试机制,并且暴露模板方法doRegister,doUnregister,doSubscribe,doUnsubscribe让不同类型的注册中心实现。doNotify还是默认父类的逻辑。 同时也重载了recover方法,通过FailbackRegistry的重试机制实现recover
以registry方法作为样例看下添加的重试机制
/**
* register行为,提供者使用
* 在AbstractRegistry的基础上,增加失败重试机制
* @param url
*/
@Override
public void register(URL url) {
super.register(url);
//这里成功,会删除failedRegistered,failedUnregistered中的url
failedRegistered.remove(url);
failedUnregistered.remove(url);
try {
// Sending a registration request to the server side
//具体register逻辑交给子类实现
doRegister(url);
} catch (Exception e) {
Throwable t = e;
// If the startup detection is opened, the Exception is thrown directly.
//如果注册中心或者提供者url的check为false的话,跳过抛出异常
boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
&& url.getParameter(Constants.CHECK_KEY, true)
&& !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());
//如果是注册的时候,抛出这个异常,那么也会忽略,只打日志
boolean skipFailback = t instanceof SkipFailbackWrapperException;
if (check || skipFailback) {
if (skipFailback) {
t = t.getCause();
}
throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
} else {
logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
}
// Record a failed registration request to a failed list, retry regularly
//加入到失败重试集合
failedRegistered.add(url);
}
}
注册失败后会把需要注册重试的url放入failedRegistered集合 然后在FailbackRegistry构造函数中起的定时任务会进行重试
public FailbackRegistry(URL url) {
super(url);
this.retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD);
//重试的定时线程,使用future用于取消
this.retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
// Check and connect to the registry
try {
retry();
} catch (Throwable t) { // Defensive fault tolerance
logger.error("Unexpected error occur at failed retry, cause: " + t.getMessage(), t);
}
}
}, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS);
}
retry方法的具体逻辑,就是循环遍历这些失败集合,然后调用doXXX方法进行重试
recover方法会在和Zookeeper重连时触发,在断连状态下,dubbo进程内的注册,订阅行为是会被缓存下来的,然后对所有缓存的url进行重新注册,订阅。 这边有个细节点,可以看到failedRegistered这些集合使用的都是线程安全的集合
private final Set<URL> failedRegistered = new ConcurrentHashSet<URL>();
因为recover,retry这两个操作还是存在资源竞争的,但不仅限于这两个操作
ZookeeperRegistry
ZookeeperRegistry的工作就是通过Zookeeper API实现doRegister,doUnregister,doSubscribe,doUnsubscribe具体逻辑
首先来看下ZookeeperRegistry的构造函数,做的主要工作是初始化zk客户端
public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
super(url);
if (url.isAnyHost()) {
throw new IllegalStateException("registry address == null");
}
//如果不进行配置,默认dubbo根目录就是/dubbo
String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
if (!group.startsWith(Constants.PATH_SEPARATOR)) {
group = Constants.PATH_SEPARATOR + group;
}
this.root = group;
zkClient = zookeeperTransporter.connect(url);
//zookeeper添加重连回调,会触发recover方法,进行失败任务重试
//为什么FailbackRegistry都是用线程安全的集合,因为在这里存在线程竞争资源
zkClient.addStateListener(new StateListener() {
@Override
public void stateChanged(int state) {
if (state == RECONNECTED) {
try {
recover();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
}
});
}
使用zookeeperTransporter扩展点加载zk客户端实现,默认为Curator框架
@SPI("curator")
public interface ZookeeperTransporter {
@Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY})
ZookeeperClient connect(URL url);
}
注册/取消注册实现
然后再来看doRegister和doUnregister方法,对于zk来说,就是创建目录呗
/**
* 注册的逻辑,就是在zookeeper创建节点,节点路径为toUrlPath(url)
* 具体格式为 /{group}/{interfaceName}/{category}/{url.toFullString}
* DYNAMIC_KEY表示是否创建永久节点,true表示不是,断开连接后会消失,所以需要进行recover
* @param url
*/
@Override
protected void doRegister(URL url) {
try {
zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
} catch (Throwable e) {
throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
/**
* 取消注册,就是删除那个节点
* @param url
*/
@Override
protected void doUnregister(URL url) {
try {
zkClient.delete(toUrlPath(url));
} catch (Throwable e) {
throw new RpcException("Failed to unregister " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
需要注意下节点的路径生成格式,也就是toUrlPath(url)方法,格式为 /{group}/{interfaceName}/{category}/{url.toFullString}, group一般不配置的话为dubbo, interfaceName对应具体接口, category开始就讲过,分为consumers,configuators,routers,providers url.toFullString就是我们的url配置
对于registry来讲category=providers
取消注册就是对应删除那个节点
订阅/取消订阅实现
订阅的行为对于消费者来讲,用于获取providers和routers,用于得到路由后的提供者 对于提供者来讲,订阅configuators,通过新的配置重新暴露 在ZookeeperRegistry,我们只关注如何进行订阅,具体监听器的作用,在用到的模块再讲 doSubscribe方法支持订阅全局和订阅特定接口 如果interface=*,即订阅全局,对于新增和已存在的所有接口的改动都会触发回调 如果interface=特定接口,那么只有这个接口的子节点改变时,才触发回调
@Override
protected void doSubscribe(final URL url, final NotifyListener listener) {
try {
if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
String root = toRootPath();
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
if (listeners == null) {
zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
listeners = zkListeners.get(url);
}
ChildListener zkListener = listeners.get(listener);
if (zkListener == null) {
listeners.putIfAbsent(listener, new ChildListener() {
@Override
public void childChanged(String parentPath, List<String> currentChilds) {
for (String child : currentChilds) {
child = URL.decode(child);
if (!anyServices.contains(child)) {
anyServices.add(child);
//如果consumer的interface为*,会订阅每一个url,会触发另一个分支的逻辑
//这里是用来对/dubbo下面提供者新增时的回调,相当于增量
subscribe(url.setPath(child).addParameters(Constants.INTERFACE_KEY, child,
Constants.CHECK_KEY, String.valueOf(false)), listener);
}
}
}
});
zkListener = listeners.get(listener);
}
zkClient.create(root, false);
//添加监听器会返回子节点集合
List<String> services = zkClient.addChildListener(root, zkListener);
if (services != null && !services.isEmpty()) {
for (String service : services) {
service = URL.decode(service);
anyServices.add(service);
//如果consumer的interface为*,会订阅每一个url,会触发另一个分支的逻辑
//这里的逻辑只执行一次,一次全量
subscribe(url.setPath(service).addParameters(Constants.INTERFACE_KEY, service,
Constants.CHECK_KEY, String.valueOf(false)), listener);
}
}
} else {
//这边是针对明确interface的订阅逻辑
List<URL> urls = new ArrayList<URL>();
//针对每种category路径进行监听
for (String path : toCategoriesPath(url)) {
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
if (listeners == null) {
zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
listeners = zkListeners.get(url);
}
ChildListener zkListener = listeners.get(listener);
if (zkListener == null) {
//封装回调逻辑
listeners.putIfAbsent(listener, new ChildListener() {
@Override
public void childChanged(String parentPath, List<String> currentChilds) {
ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
}
});
zkListener = listeners.get(listener);
}
//创建节点
zkClient.create(path, false);
//增加回调
List<String> children = zkClient.addChildListener(path, zkListener);
if (children != null) {
urls.addAll(toUrlsWithEmpty(url, path, children));
}
}
//如果有子节点,直接进行触发一次,对应AbstractRegsitry的lookup方法
//意思就是第一次订阅,如果订阅目录存在子节点,直接会触发一次
notify(url, listener, urls);
}
} catch (Throwable e) {
throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
这边需要注意一点的是,每次进行订阅,最重要的第一次,会使用当前订阅节点的子节点数据触发一次notify,执行对应监听器逻辑,这个在后面RegistryDirectory中会用到这个特性
取消订阅没什么好讲的,删除订阅数据即可
讲了这么多,对于lookup方法,使用消费者查找提供者的逻辑其实也很简单。使用消费者url构造出zk中provider的目录,然后返回所有子节点即可
/**
* 查找消费者url 对应 提供者url实现
* 这边的url为消费者url
* @param url
* @return
*/
@Override
public List<URL> lookup(URL url) {
if (url == null) {
throw new IllegalArgumentException("lookup url == null");
}
try {
List<String> providers = new ArrayList<String>();
//返回inteface下面所有category的url
for (String path : toCategoriesPath(url)) {
List<String> children = zkClient.getChildren(path);
if (children != null) {
providers.addAll(children);
}
}
//返回匹配的url
return toUrlsWithoutEmpty(url, providers);
} catch (Throwable e) {
throw new RpcException("Failed to lookup " + url + " from zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
总结
1.Zookeeper监听器的妙用,在Elasticjob也是使用到了这个特性,进行任务触发 2.通过zookeeperTransporter以及ZookeeperClient对Zookeeper操作进行抽象,进而支持两种zookeeper客户端框架。包括在remoting模块也是采用这种设计模式,和底层框架解耦。 3.Zookeeper默认的监听是一次性的,Curator框架实现了永久监听,但是dubbo没用到Curator这个特性。 4.写完这部分,Dirctory模块就比较容易写下去了,东西太多,有些地方的理解肯定存在偏差,希望读者能多多交流
以上是关于Dubbo之ZookeeperRegistry源码分析的主要内容,如果未能解决你的问题,请参考以下文章