Dubbo之ZookeeperRegistry源码分析

Posted Java后端笔记

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Dubbo之ZookeeperRegistry源码分析相关的知识,希望对你有一定的参考价值。

ZookeeperRegistry的作用

ZookeeperRegistry是dubbo中常用的注册中心实现,它主要作用通过Zookeeper的目录监听机制,让消费者能够实时得到在线的提供者列表。并且一些服务治理的功能也是通过zookeeper这个监听特性巧妙的完成。

在具体讲解ZookeeperRegistry的相关源码之前,先来分析下dubbo在zookeeper的目录结构以及dubbo如何利用这个特性

Zookeeper目录结构

dubbo在zookeeper建立的目录是基于接口的,大致如下针对每个接口节点会存在以下4个子节点

节点名 作用 子节点是否持久节点
consumers 存储消费者节点url
configuators 存储override或者absent url,用于服务治理
routers 用于设置路由url,用于服务治理
providers 存储在线提供者url

consumer节点存在的意义并不大,主要还是为了做监控 其他三个节点,都会设置被相应的监听器,发生改变时,会触发特定事件

Dubbo对Zookeeper监听机制的利用

Dubbo中通过ZookeeperClient的实现类来对zookeeper进行操作ZookeeperClient提供设置两种监听器的方法,对应子节点监听器和状态监听器,这里我们关注子节点监听器ChildListener

 
   
   
 
  1. public interface ChildListener {

  2.    /**

  3.     *

  4.     * @param path 监听的节点

  5.     * @param children 监听的节点的所有子节点

  6.     */

  7.    void childChanged(String path, List<String> children);

  8. }

ZookeeperClient有两种实现,第一种通过官方提供的jar包,第二个通过Apache的Curator框架,默认使用第二种,我们讲解的也是Curator的对应实现 添加子节点监听器的方法为addChildListener

 
   
   
 
  1. public List<String> addChildListener(String path, final ChildListener listener) {

  2.        //对listener做缓存,因为ChildListener是dubbo提供的监听器接口,需要转换为cruator的监听器接口

  3.        ConcurrentMap<ChildListener, TargetChildListener> listeners = childListeners.get(path);

  4.        if (listeners == null) {

  5.            childListeners.putIfAbsent(path, new ConcurrentHashMap<ChildListener, TargetChildListener>());

  6.            listeners = childListeners.get(path);

  7.        }

  8.        TargetChildListener targetListener = listeners.get(listener);

  9.        if (targetListener == null) {

  10.            //createTargetChildListener会对监听器进行转换

  11.            listeners.putIfAbsent(listener, createTargetChildListener(path, listener));

  12.            targetListener = listeners.get(listener);

  13.        }

  14.        return addTargetChildListener(path, targetListener);

  15.    }

Dubbo底层封装了2套Zookeeper API,所以通过ChildListener抽象了监听器,但是在实际调用时会通过createTargetChildListener转为对应框架的监听器实现 addTargetChildListener方法在添加监听器之后会返回监听path当前的所有的子节点

 
   
   
 
  1. public List<String> addTargetChildListener(String path, CuratorWatcher listener) {

  2.        try {

  3.            //添加监听,并且返回这个目录当前所有子节点

  4.            //这种监听方式是一次性的,在listener实现中会再次执行监听逻辑

  5.            return client.getChildren().usingWatcher(listener).forPath(path);

  6.        } catch (NoNodeException e) {

  7.            return null;

  8.        } catch (Exception e) {

  9.            throw new IllegalStateException(e.getMessage(), e);

  10.        }

  11.    }

上述代码需要注意监听是一次性的,其实curator提供了TreeCache用作永久性的监听,这边不用到这个特性,应该是为了和官方API保持一致吧。 接下去看下Cruator监听器的封装

 
   
   
 
  1. public CuratorWatcher createTargetChildListener(String path, ChildListener listener) {

  2.        return new CuratorWatcherImpl(listener);

  3.    }

 
   
   
 
  1. private class CuratorWatcherImpl implements CuratorWatcher {

  2.        private volatile ChildListener listener;

  3.        public CuratorWatcherImpl(ChildListener listener) {

  4.            this.listener = listener;

  5.        }

  6.        public void unwatch() {

  7.            this.listener = null;

  8.        }

  9.        @Override

  10.        public void process(WatchedEvent event) throws Exception {

  11.            if (listener != null) {

  12.                String path = event.getPath() == null ? "" : event.getPath();

  13.                listener.childChanged(path,

  14.                        // if path is null, curator using watcher will throw NullPointerException.

  15.                        // if client connect or disconnect to server, zookeeper will queue

  16.                        // watched event(Watcher.Event.EventType.None, .., path = null).

  17.                        StringUtils.isNotEmpty(path)

  18.                                //再次设置监听,并且把监听path的所有子节点传入childChanged方法

  19.                                ? client.getChildren().usingWatcher(this).forPath(path)

  20.                                : Collections.<String>emptyList());

  21.            }

  22.        }

  23.    }

可以看到listener的触发逻辑以及入参来源

源码分析

通过ZookeeperRegistry的类继承图,逐上而下的分析源码

Registry接口

 
   
   
 
  1. public interface Registry extends Node, RegistryService {

  2. }

Registry继承Node和RegistryService两个接口,本身不提供接口方法

 
   
   
 
  1. public interface Node {

  2.    /**

  3.     * get url.

  4.     *

  5.     * @return url.

  6.     */

  7.    URL getUrl();

  8.    /**

  9.     * is available.

  10.     *

  11.     * @return available.

  12.     */

  13.    boolean isAvailable();

  14.    /**

  15.     * destroy.

  16.     */

  17.    void destroy();

  18. }

Node约束了三个生命周期相关的方法 getUrl用于获取当前组件的url配置 isAvailable检测组件是否可用 destroy用于销毁组件

 
   
   
 
  1. public interface RegistryService {

  2.    void register(URL url);

  3.    void unregister(URL url);

  4.    void subscribe(URL url, NotifyListener listener);

  5.    void unsubscribe(URL url, NotifyListener listener);

  6.    List<URL> lookup(URL url);

  7. }

RegistryService规定了和注册中心相关的方法 register和unregister用于提供者向注册中心注册提供者url subscribe和unsubscribe用于消费者向对应接口目录注册监听 lookup用于查找查找url,通过消费者url查找提供者url以及服务治理有关的url

AbstractRegistry

主要提供接口提供者本地缓存功能 以及基础register,unregister,subscribe,unsubscribe,notify,lookup,recover逻辑

register,unregister会(接触)注册提供者url,主要操作

 
   
   
 
  1. private final Set<URL> registered = new ConcurrentHashSet<URL>();

subscribe,unsubscribe则会针对特定url提供监听,主要操作

 
   
   
 
  1. private final ConcurrentMap<URL, Set<NotifyListener>> subscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>();

notify方法会缓存最近通知的url到notified以及触发listener回调

 
   
   
 
  1. /**

  2.     * 这个方法不会直接触发,被FailbackRegistry重载

  3.     * FailbackRegistry增加failback逻辑后,还是会调用这个方法

  4.     * @param url

  5.     * @param listener

  6.     * @param urls

  7.     */

  8.    protected void notify(URL url, NotifyListener listener, List<URL> urls) {

  9.        if (url == null) {

  10.            throw new IllegalArgumentException("notify url == null");

  11.        }

  12.        if (listener == null) {

  13.            throw new IllegalArgumentException("notify listener == null");

  14.        }

  15.        if ((urls == null || urls.isEmpty())

  16.                && !Constants.ANY_VALUE.equals(url.getServiceInterface())) {

  17.            logger.warn("Ignore empty notify urls for subscribe url " + url);

  18.            return;

  19.        }

  20.        if (logger.isInfoEnabled()) {

  21.            logger.info("Notify urls for subscribe url " + url + ", urls: " + urls);

  22.        }

  23.        Map<String, List<URL>> result = new HashMap<String, List<URL>>();

  24.        //根据url的category进行分类

  25.        for (URL u : urls) {

  26.            if (UrlUtils.isMatch(url, u)) {

  27.                String category = u.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);

  28.                List<URL> categoryList = result.get(category);

  29.                if (categoryList == null) {

  30.                    categoryList = new ArrayList<URL>();

  31.                    result.put(category, categoryList);

  32.                }

  33.                categoryList.add(u);

  34.            }

  35.        }

  36.        if (result.size() == 0) {

  37.            return;

  38.        }

  39.        //下面操作notified缓存

  40.        Map<String, List<URL>> categoryNotified = notified.get(url);

  41.        if (categoryNotified == null) {

  42.            notified.putIfAbsent(url, new ConcurrentHashMap<String, List<URL>>());

  43.            categoryNotified = notified.get(url);

  44.        }

  45.        for (Map.Entry<String, List<URL>> entry : result.entrySet()) {

  46.            String category = entry.getKey();

  47.            List<URL> categoryList = entry.getValue();

  48.            //对notified内容进行覆盖,相当于会保存上一次的通知

  49.            categoryNotified.put(category, categoryList);

  50.            //每次通知后会刷新本地缓存

  51.            saveProperties(url);

  52.            //进行listener回调,每种category的url分别回调一次

  53.            listener.notify(categoryList);

  54.        }

  55.    }

这个类的recover方法不分析,因为FailbackRegistry完全重写了这个方法

FailbackRegistry

FailbackRegistry重载了AbstractRegistry中的subscribe,unsubscribe,register,unregister,notify方法,在AbstractRegistry的基础上提供了失败重试机制,并且暴露模板方法doRegister,doUnregister,doSubscribe,doUnsubscribe让不同类型的注册中心实现。doNotify还是默认父类的逻辑。 同时也重载了recover方法,通过FailbackRegistry的重试机制实现recover

以registry方法作为样例看下添加的重试机制

 
   
   
 
  1. /**

  2.     * register行为,提供者使用

  3.     * 在AbstractRegistry的基础上,增加失败重试机制

  4.     * @param url

  5.     */

  6.    @Override

  7.    public void register(URL url) {

  8.        super.register(url);

  9.        //这里成功,会删除failedRegistered,failedUnregistered中的url

  10.        failedRegistered.remove(url);

  11.        failedUnregistered.remove(url);

  12.        try {

  13.            // Sending a registration request to the server side

  14.            //具体register逻辑交给子类实现

  15.            doRegister(url);

  16.        } catch (Exception e) {

  17.            Throwable t = e;

  18.            // If the startup detection is opened, the Exception is thrown directly.

  19.            //如果注册中心或者提供者url的check为false的话,跳过抛出异常

  20.            boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)

  21.                    && url.getParameter(Constants.CHECK_KEY, true)

  22.                    && !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());

  23.            //如果是注册的时候,抛出这个异常,那么也会忽略,只打日志

  24.            boolean skipFailback = t instanceof SkipFailbackWrapperException;

  25.            if (check || skipFailback) {

  26.                if (skipFailback) {

  27.                    t = t.getCause();

  28.                }

  29.                throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);

  30.            } else {

  31.                logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);

  32.            }

  33.            // Record a failed registration request to a failed list, retry regularly

  34.            //加入到失败重试集合

  35.            failedRegistered.add(url);

  36.        }

  37.    }

注册失败后会把需要注册重试的url放入failedRegistered集合 然后在FailbackRegistry构造函数中起的定时任务会进行重试

 
   
   
 
  1. public FailbackRegistry(URL url) {

  2.        super(url);

  3.        this.retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD);

  4.        //重试的定时线程,使用future用于取消

  5.        this.retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() {

  6.            @Override

  7.            public void run() {

  8.                // Check and connect to the registry

  9.                try {

  10.                    retry();

  11.                } catch (Throwable t) { // Defensive fault tolerance

  12.                    logger.error("Unexpected error occur at failed retry, cause: " + t.getMessage(), t);

  13.                }

  14.            }

  15.        }, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS);

  16.    }

retry方法的具体逻辑,就是循环遍历这些失败集合,然后调用doXXX方法进行重试

recover方法会在和Zookeeper重连时触发,在断连状态下,dubbo进程内的注册,订阅行为是会被缓存下来的,然后对所有缓存的url进行重新注册,订阅。 这边有个细节点,可以看到failedRegistered这些集合使用的都是线程安全的集合

 
   
   
 
  1. private final Set<URL> failedRegistered = new ConcurrentHashSet<URL>();

因为recover,retry这两个操作还是存在资源竞争的,但不仅限于这两个操作

ZookeeperRegistry

ZookeeperRegistry的工作就是通过Zookeeper API实现doRegister,doUnregister,doSubscribe,doUnsubscribe具体逻辑

首先来看下ZookeeperRegistry的构造函数,做的主要工作是初始化zk客户端

 
   
   
 
  1. public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {

  2.        super(url);

  3.        if (url.isAnyHost()) {

  4.            throw new IllegalStateException("registry address == null");

  5.        }

  6.        //如果不进行配置,默认dubbo根目录就是/dubbo

  7.        String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);

  8.        if (!group.startsWith(Constants.PATH_SEPARATOR)) {

  9.            group = Constants.PATH_SEPARATOR + group;

  10.        }

  11.        this.root = group;

  12.        zkClient = zookeeperTransporter.connect(url);

  13.        //zookeeper添加重连回调,会触发recover方法,进行失败任务重试

  14.        //为什么FailbackRegistry都是用线程安全的集合,因为在这里存在线程竞争资源

  15.        zkClient.addStateListener(new StateListener() {

  16.            @Override

  17.            public void stateChanged(int state) {

  18.                if (state == RECONNECTED) {

  19.                    try {

  20.                        recover();

  21.                    } catch (Exception e) {

  22.                        logger.error(e.getMessage(), e);

  23.                    }

  24.                }

  25.            }

  26.        });

  27.    }

使用zookeeperTransporter扩展点加载zk客户端实现,默认为Curator框架

 
   
   
 
  1. @SPI("curator")

  2. public interface ZookeeperTransporter {

  3.    @Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY})

  4.    ZookeeperClient connect(URL url);

  5. }

注册/取消注册实现

然后再来看doRegister和doUnregister方法,对于zk来说,就是创建目录呗

 
   
   
 
  1. /**

  2.     * 注册的逻辑,就是在zookeeper创建节点,节点路径为toUrlPath(url)

  3.     * 具体格式为 /{group}/{interfaceName}/{category}/{url.toFullString}

  4.     * DYNAMIC_KEY表示是否创建永久节点,true表示不是,断开连接后会消失,所以需要进行recover

  5.     * @param url

  6.     */

  7.    @Override

  8.    protected void doRegister(URL url) {

  9.        try {

  10.            zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));

  11.        } catch (Throwable e) {

  12.            throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);

  13.        }

  14.    }

 
   
   
 
  1. /**

  2.     * 取消注册,就是删除那个节点

  3.     * @param url

  4.     */

  5.    @Override

  6.    protected void doUnregister(URL url) {

  7.        try {

  8.            zkClient.delete(toUrlPath(url));

  9.        } catch (Throwable e) {

  10.            throw new RpcException("Failed to unregister " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);

  11.        }

  12.    }

需要注意下节点的路径生成格式,也就是toUrlPath(url)方法,格式为 /{group}/{interfaceName}/{category}/{url.toFullString}, group一般不配置的话为dubbo, interfaceName对应具体接口, category开始就讲过,分为consumers,configuators,routers,providers url.toFullString就是我们的url配置

对于registry来讲category=providers

取消注册就是对应删除那个节点

订阅/取消订阅实现

订阅的行为对于消费者来讲,用于获取providers和routers,用于得到路由后的提供者 对于提供者来讲,订阅configuators,通过新的配置重新暴露 在ZookeeperRegistry,我们只关注如何进行订阅,具体监听器的作用,在用到的模块再讲 doSubscribe方法支持订阅全局和订阅特定接口 如果interface=*,即订阅全局,对于新增和已存在的所有接口的改动都会触发回调 如果interface=特定接口,那么只有这个接口的子节点改变时,才触发回调

 
   
   
 
  1. @Override

  2.    protected void doSubscribe(final URL url, final NotifyListener listener) {

  3.        try {

  4.            if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {

  5.                String root = toRootPath();

  6.                ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);

  7.                if (listeners == null) {

  8.                    zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());

  9.                    listeners = zkListeners.get(url);

  10.                }

  11.                ChildListener zkListener = listeners.get(listener);

  12.                if (zkListener == null) {

  13.                    listeners.putIfAbsent(listener, new ChildListener() {

  14.                        @Override

  15.                        public void childChanged(String parentPath, List<String> currentChilds) {

  16.                            for (String child : currentChilds) {

  17.                                child = URL.decode(child);

  18.                                if (!anyServices.contains(child)) {

  19.                                    anyServices.add(child);

  20.                                    //如果consumer的interface为*,会订阅每一个url,会触发另一个分支的逻辑

  21.                                    //这里是用来对/dubbo下面提供者新增时的回调,相当于增量

  22.                                    subscribe(url.setPath(child).addParameters(Constants.INTERFACE_KEY, child,

  23.                                            Constants.CHECK_KEY, String.valueOf(false)), listener);

  24.                                }

  25.                            }

  26.                        }

  27.                    });

  28.                    zkListener = listeners.get(listener);

  29.                }

  30.                zkClient.create(root, false);

  31.                //添加监听器会返回子节点集合

  32.                List<String> services = zkClient.addChildListener(root, zkListener);

  33.                if (services != null && !services.isEmpty()) {

  34.                    for (String service : services) {

  35.                        service = URL.decode(service);

  36.                        anyServices.add(service);

  37.                        //如果consumer的interface为*,会订阅每一个url,会触发另一个分支的逻辑

  38.                        //这里的逻辑只执行一次,一次全量

  39.                        subscribe(url.setPath(service).addParameters(Constants.INTERFACE_KEY, service,

  40.                                Constants.CHECK_KEY, String.valueOf(false)), listener);

  41.                    }

  42.                }

  43.            } else {

  44.                //这边是针对明确interface的订阅逻辑

  45.                List<URL> urls = new ArrayList<URL>();

  46.                //针对每种category路径进行监听

  47.                for (String path : toCategoriesPath(url)) {

  48.                    ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);

  49.                    if (listeners == null) {

  50.                        zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());

  51.                        listeners = zkListeners.get(url);

  52.                    }

  53.                    ChildListener zkListener = listeners.get(listener);

  54.                    if (zkListener == null) {

  55.                        //封装回调逻辑

  56.                        listeners.putIfAbsent(listener, new ChildListener() {

  57.                            @Override

  58.                            public void childChanged(String parentPath, List<String> currentChilds) {

  59.                                ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));

  60.                            }

  61.                        });

  62.                        zkListener = listeners.get(listener);

  63.                    }

  64.                    //创建节点

  65.                    zkClient.create(path, false);

  66.                    //增加回调

  67.                    List<String> children = zkClient.addChildListener(path, zkListener);

  68.                    if (children != null) {

  69.                        urls.addAll(toUrlsWithEmpty(url, path, children));

  70.                    }

  71.                }

  72.                //如果有子节点,直接进行触发一次,对应AbstractRegsitry的lookup方法

  73.                //意思就是第一次订阅,如果订阅目录存在子节点,直接会触发一次

  74.                notify(url, listener, urls);

  75.            }

  76.        } catch (Throwable e) {

  77.            throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);

  78.        }

  79.    }

这边需要注意一点的是,每次进行订阅,最重要的第一次,会使用当前订阅节点的子节点数据触发一次notify,执行对应监听器逻辑,这个在后面RegistryDirectory中会用到这个特性

取消订阅没什么好讲的,删除订阅数据即可

讲了这么多,对于lookup方法,使用消费者查找提供者的逻辑其实也很简单。使用消费者url构造出zk中provider的目录,然后返回所有子节点即可

 
   
   
 
  1. /**

  2.     * 查找消费者url 对应 提供者url实现

  3.     * 这边的url为消费者url

  4.     * @param url

  5.     * @return

  6.     */

  7.    @Override

  8.    public List<URL> lookup(URL url) {

  9.        if (url == null) {

  10.            throw new IllegalArgumentException("lookup url == null");

  11.        }

  12.        try {

  13.            List<String> providers = new ArrayList<String>();

  14.            //返回inteface下面所有category的url

  15.            for (String path : toCategoriesPath(url)) {

  16.                List<String> children = zkClient.getChildren(path);

  17.                if (children != null) {

  18.                    providers.addAll(children);

  19.                }

  20.            }

  21.            //返回匹配的url

  22.            return toUrlsWithoutEmpty(url, providers);

  23.        } catch (Throwable e) {

  24.            throw new RpcException("Failed to lookup " + url + " from zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);

  25.        }

  26.    }

总结

1.Zookeeper监听器的妙用,在Elasticjob也是使用到了这个特性,进行任务触发 2.通过zookeeperTransporter以及ZookeeperClient对Zookeeper操作进行抽象,进而支持两种zookeeper客户端框架。包括在remoting模块也是采用这种设计模式,和底层框架解耦。 3.Zookeeper默认的监听是一次性的,Curator框架实现了永久监听,但是dubbo没用到Curator这个特性。 4.写完这部分,Dirctory模块就比较容易写下去了,东西太多,有些地方的理解肯定存在偏差,希望读者能多多交流


以上是关于Dubbo之ZookeeperRegistry源码分析的主要内容,如果未能解决你的问题,请参考以下文章

01.dubbo源码解析--注册中心(缓存机制)

Dubbo注册中心

【Dubbo】与ZK交互

Dubbo-服务暴露流程

第零章 目录

1.dubbo源码分析之dubbo项目结构