7.6 服务远程暴露 - 注册服务到zookeeper

Posted 赵计刚

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了7.6 服务远程暴露 - 注册服务到zookeeper相关的知识,希望对你有一定的参考价值。

为了安全:服务启动的ip全部使用10.10.10.10

远程服务的暴露总体步骤:

  • 将ref封装为invoker
  • 将invoker转换为exporter
  • 启动netty
  • 注册服务到zookeeper
  • 订阅
  • 返回新的exporter实例

7.4 服务远程暴露 - 创建Exporter与启动netty服务端中,实现了前三步,本节实现第四步:注册服务到zk。总体代码如下:RegistryProtocol.export(final Invoker<T> originInvoker)

1         final Registry registry = getRegistry(originInvoker);//创建ZookeeperRegistry实例:创建CuratorClient,并启动会话。
2         final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);//获取真正要注册在zk上的url
3         registry.register(registedProviderUrl);//创建节点(即注册服务到zk上)

说明:

  • 第一句代码用来创建ZookeeperRegistry实例:创建CuratorClient,并启动会话。
  • 第二句代码获取真正要注册在zk上的url
  • 第三句代码实现创建节点(即注册服务到zk上)

 

一  创建ZookeeperRegistry实例

1  RegistryProtocol.getRegistry(final Invoker<?> originInvoker)

 1     /**
 2      * 根据invoker的地址获取registry实例
 3      */
 4     private Registry getRegistry(final Invoker<?> originInvoker) {
 5         URL registryUrl = originInvoker.getUrl();
 6         if (Constants.REGISTRY_PROTOCOL.equals(registryUrl.getProtocol())) {
 7             String protocol = registryUrl.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_DIRECTORY);//zookeeper
 8             registryUrl = registryUrl.setProtocol(protocol).removeParameter(Constants.REGISTRY_KEY);
 9         }
10         return registryFactory.getRegistry(registryUrl);
11     }

首先对originInvoker中的url进行处理:

  • 将协议换成zookeeper
  • 去掉registry=zookeeper的参数

来看一下originInvoker的url:(解码后的)

registry://10.211.55.5:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&client=curator&dubbo=2.0.0&export=dubbo://10.10.10.10:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=2791&side=provider&timestamp=1507262031554&pid=2791&registry=zookeeper&timestamp=1507262031521

说明:

  • 第一个红色部分代表协议:zookeeper
  • 第二个红色部分是export参数
  • 第三个红色部分是registry=zookeeper

经过处理之后的registryUrl为:

zookeeper://10.211.55.5:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&client=curator&dubbo=2.0.0&export=dubbo://10.10.10.10:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=2791&side=provider&timestamp=1507262031554&pid=2791&timestamp=1507262031521

之后使用注册工厂来创建注册中心。

 

2  RegistryFactory$Adaptive.getRegistry(com.alibaba.dubbo.common.URL registryUrl)

 1 public class RegistryFactory$Adaptive implements com.alibaba.dubbo.registry.RegistryFactory {
 2     public com.alibaba.dubbo.registry.Registry getRegistry(com.alibaba.dubbo.common.URL arg0) {
 3         if (arg0 == null)
 4             throw new IllegalArgumentException("url == null");
 5         com.alibaba.dubbo.common.URL url = arg0;
 6         String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );//zookeeper
 7         if(extName == null)
 8             throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.registry.RegistryFactory) name from url(" + url.toString() + ") use keys([protocol])");
 9         com.alibaba.dubbo.registry.RegistryFactory extension = (com.alibaba.dubbo.registry.RegistryFactory)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.registry.RegistryFactory.class).getExtension(extName);
10         return extension.getRegistry(arg0);
11     }
12 }

这里获取到的extension是ZookeeperRegistryFactory,之后,使用ZookeeperRegistryFactory进行Registry的创建。首先来看一下ZookeeperRegistryFactory的继承图:

getRegistry方法在ZookeeperRegistryFactory的父类AbstractRegistryFactory中。

 

3  AbstractRegistryFactory.getRegistry(URL registryUrl)

 1     public Registry getRegistry(URL url) {
 2         url = url.setPath(RegistryService.class.getName())
 3                 .addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName())
 4                 .removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY);
 5         String key = url.toServiceString();
 6         // 锁定注册中心获取过程,保证注册中心单一实例
 7         LOCK.lock();
 8         try {
 9             Registry registry = REGISTRIES.get(key);
10             if (registry != null) {
11                 return registry;
12             }
13             registry = createRegistry(url);
14             if (registry == null) {
15                 throw new IllegalStateException("Can not create registry " + url);
16             }
17             REGISTRIES.put(key, registry);
18             return registry;
19         } finally {
20             // 释放锁
21             LOCK.unlock();
22         }
23     }

流程:

  • 先处理url,之后获取Registry的key,然后根据该key从Map<String, Registry> REGISTRIES注册中心集合缓存中获取Registry,如果有,直接返回,如果没有,创建Registry,之后存入缓存,最后返回。

首先处理传入的registryUrl:

  • 设置:path=com.alibaba.dubbo.registry.RegistryService
  • 添加参数:interface=com.alibaba.dubbo.registry.RegistryService
  • 去除export参数

最终得到的registryUrl如下:

zookeeper://10.211.55.5:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&client=curator&dubbo=2.0.0&interface=com.alibaba.dubbo.registry.RegistryService&pid=2791&timestamp=1507262031521

之后,很具上述的registryUrl创建Registry的key,该{ key : Registry }最终会被存储在Map<String, Registry> REGISTRIES注册中心集合(该属性是ZookeeperRegistryFactory父类AbstractRegistryFactory的一个属性)中。

 

根据registryUrl创建Registry的key:url.toServiceString()

 1     public String toServiceString() {
 2         return buildString(true, false, true, true);
 3     }
 4 
 5     private String buildString(boolean appendUser, boolean appendParameter, boolean useIP, boolean useService, String... parameters) {
 6         StringBuilder buf = new StringBuilder();
 7         if (protocol != null && protocol.length() > 0) {  //protocol://
 8             buf.append(protocol);
 9             buf.append("://");
10         }
11         if (appendUser && username != null && username.length() > 0) {  //protocol://username:password@host:port/group/interface{path}:version/parameters
12             buf.append(username);
13             if (password != null && password.length() > 0) {
14                 buf.append(":");
15                 buf.append(password);
16             }
17             buf.append("@");
18         }
19         String host;
20         if (useIP) {
21             host = getIp();
22         } else {
23             host = getHost();
24         }
25         if (host != null && host.length() > 0) {
26             buf.append(host);
27             if (port > 0) {
28                 buf.append(":");
29                 buf.append(port);
30             }
31         }
32         String path;
33         if (useService) {
34             path = getServiceKey();
35         } else {
36             path = getPath();
37         }
38         if (path != null && path.length() > 0) {
39             buf.append("/");
40             buf.append(path);
41         }
42         if (appendParameter) {
43             buildParameters(buf, true, parameters);
44         }
45         return buf.toString();
46     }
47 
48     public String getServiceKey() {
49         String inf = getServiceInterface();//先获取interface参数,如果没有的话,取path的值,这里都是com.alibaba.dubbo.registry.RegistryService
50         if (inf == null) return null;
51         StringBuilder buf = new StringBuilder();
52         String group = getParameter(Constants.GROUP_KEY);
53         if (group != null && group.length() > 0) {
54             buf.append(group).append("/"); //interfacegroup
55         }
56         buf.append(inf);
57         String version = getParameter(Constants.VERSION_KEY);
58         if (version != null && version.length() > 0) {
59             buf.append(":").append(version);
60         }
61         return buf.toString();
62     }

最终得到的应该是这样的形式:protocol://username:password@host:port/group/interface{path}:version?key1=value1&key2=value2...。

这里key=zookeeper://10.211.55.5:2181/com.alibaba.dubbo.registry.RegistryService

之后来到了真正创建Registry的地方。

 1 public class ZookeeperRegistryFactory extends AbstractRegistryFactory {
 2     private ZookeeperTransporter zookeeperTransporter;
 3 
 4     public void setZookeeperTransporter(ZookeeperTransporter zookeeperTransporter) {
 5         this.zookeeperTransporter = zookeeperTransporter;
 6     }
 7 
 8     public Registry createRegistry(URL url) {
 9         return new ZookeeperRegistry(url, zookeeperTransporter);
10     }
11 }

这里的zookeeperTransporter对象是一个com.alibaba.dubbo.remoting.zookeeper.ZookeeperTransporter$Adaptive对象。

 

在创建ZookeeperRegistry之前来看一下其继承图:

new ZookeeperRegistry(registryUrl, ZookeeperTransporter$Adaptive对象)

 1     private final static int DEFAULT_ZOOKEEPER_PORT = 2181;
 2     private final static String DEFAULT_ROOT = "dubbo";
 3     private final String root;
 4     private final Set<String> anyServices = new ConcurrentHashSet<String>();
 5     private final ConcurrentMap<URL, ConcurrentMap<NotifyListener, ChildListener>> zkListeners = new ConcurrentHashMap<URL, ConcurrentMap<NotifyListener, ChildListener>>();
 6     private final ZookeeperClient zkClient;
 7 
 8     public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
 9         super(url);
10         if (url.isAnyHost()) {
11             throw new IllegalStateException("registry address == null");
12         }
13         String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);//dubbo
14         if (!group.startsWith(Constants.PATH_SEPARATOR)) {
15             group = Constants.PATH_SEPARATOR + group;
16         }
17         this.root = group;// /dubbo
18         zkClient = zookeeperTransporter.connect(url);//创建zk客户端,启动会话
19         zkClient.addStateListener(new StateListener() {//监听重新连接成功事件,重新连接成功后,之前已经完成注册和订阅的url要重新进行注册和订阅(因为临时节点可能已经跪了)
20             public void stateChanged(int state) {
21                 if (state == RECONNECTED) {
22                     try {
23                         recover();
24                     } catch (Exception e) {
25                         logger.error(e.getMessage(), e);
26                     }
27                 }
28             }
29         });
30     }

new FailbackRegistry(registryUrl)

 1     private final ScheduledExecutorService retryExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("DubboRegistryFailedRetryTimer", true));
 2     // 失败重试定时器,定时检查是否有请求失败,如有,无限次重试
 3     private final ScheduledFuture<?> retryFuture;
 4     private final Set<URL> failedRegistered = new ConcurrentHashSet<URL>();
 5     private final Set<URL> failedUnregistered = new ConcurrentHashSet<URL>();
 6     private final ConcurrentMap<URL, Set<NotifyListener>> failedSubscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>();
 7     private final ConcurrentMap<URL, Set<NotifyListener>> failedUnsubscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>();
 8     private final ConcurrentMap<URL, Map<NotifyListener, List<URL>>> failedNotified = new ConcurrentHashMap<URL, Map<NotifyListener, List<URL>>>();
 9     private AtomicBoolean destroyed = new AtomicBoolean(false);
10 
11     public FailbackRegistry(URL url) {
12         super(url);
13         int retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD);//5*1000
14         this.retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() {
15             public void run() {
16                 // 检测并连接注册中心
17                 try {
18                     retry();
19                 } catch (Throwable t) { // 防御性容错
20                     logger.error("Unexpected error occur at failed retry, cause: " + t.getMessage(), t);
21                 }
22             }
23         }, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS);
24     }

new AbstractRegistry(registryUrl)

 1     // URL地址分隔符,用于文件缓存中,服务提供者URL分隔
 2     private static final char URL_SEPARATOR = \' \';
 3     // URL地址分隔正则表达式,用于解析文件缓存中服务提供者URL列表
 4     private static final String URL_SPLIT = "\\\\s+";
 5     // 本地磁盘缓存,其中特殊的key值.registies记录注册中心列表,其它均为notified服务提供者列表
 6     private final Properties properties = new Properties();
 7     // 文件缓存定时写入
 8     private final ExecutorService registryCacheExecutor = Executors.newFixedThreadPool(1, new NamedThreadFactory("DubboSaveRegistryCache", true));
 9     //是否是同步保存文件
10     private final boolean syncSaveFile;
11     // 本地磁盘缓存文件
12     private File file;
13     private final AtomicLong lastCacheChanged = new AtomicLong();
14     private final Set<URL> registered = new ConcurrentHashSet<URL>();//已经注册的url集合
15     private final ConcurrentMap<URL, Set<NotifyListener>> subscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>();//已经订阅的<URL, Set<NotifyListener>>
16     private final ConcurrentMap<URL, Map<String, List<URL>>> notified = new ConcurrentHashMap<URL, Map<String, List<URL>>>();//已经通知的<URL, Map<String, List<URL>>>
17     private URL registryUrl;//注册url
18     private AtomicBoolean destroyed = new AtomicBoolean(false);
19 
20     public AbstractRegistry(URL url) {
21         setUrl(url);
22         // 启动文件保存定时器
23         syncSaveFile = url.getParameter(Constants.REGISTRY_FILESAVE_SYNC_KEY, false);
24         String filename = url.getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/.dubbo/dubbo-registry-" + url.getHost() + ".cache");
25         File file = null;
26         if (ConfigUtils.isNotEmpty(filename)) {
27             file = new File(filename);
28             if (!file.exists() && file.getParentFile() != null && !file.getParentFile().exists()) {
29                 if (!file.getParentFile().mkdirs()) {//创建文件所在的文件夹 /Users/jigangzhao/.dubbo/
30                     throw new IllegalArgumentException("Invalid registry store file " + file + ", cause: Failed to create directory " + file.getParentFile() + "!");
31                 }
32             }
33         }
34         this.file = file;
35         loadProperties();
36         notify(url.getBackupUrls());
37     }

先简单的总结一下:父子三代分别做的事情:

  • AbstractRegistry主要用来维护缓存文件
  • FailbackRegistry主要用来做失败重试操作(包括:注册失败/反注册失败/订阅失败/反订阅失败/通知失败的重试);也提供了供ZookeeperRegistry使用的zk重连后的恢复工作的方法。
  • ZookeeperRegistry创建zk客户端,启动会话;并且调用FailbackRegistry实现zk重连后的恢复工作

先看AbstractRegistry

  • 设置属性registryUrl=url:zookeeper://10.211.55.5:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&client=curator&dubbo=2.0.0&interface=com.alibaba.dubbo.registry.RegistryService&pid=4685&timestamp=1507286468150
  • 创建文件/Users/jigangzhao/.dubbo/dubbo-registry-10.211.55.5.cache的文件夹/Users/jigangzhao/.dubbo
  • 设置属性file:/Users/jigangzhao/.dubbo/dubbo-registry-10.211.55.5.cache文件,该文件存储信息将是这样的:

    com.alibaba.dubbo.demo.DemoService=empty\\://10.10.10.10\\:20880/com.alibaba.dubbo.demo.DemoService?anyhost\\=true&application\\=demo-provider&category\\=configurators&check\\=false&dubbo\\=2.0.0&generic\\=false&interface\\=com.alibaba.dubbo.demo.DemoService&methods\\=sayHello&pid\\=5259&side\\=provider&timestamp\\=1507294508053

  • 如果file存在,将file中的内容写入properties属性;既然有读file,那么是什么时候写入file的呢?AbstractRegistry创建了一个含有一个名字为DubboSaveRegistryCache的后台线程的FixedThreadPool,只在在notify(URL url, NotifyListener listener, List<URL> urls)方法中会被调用,我们此处由于ConcurrentMap<URL, Set<NotifyListener>> subscribed为空,所以AbstractRegistry(URL url)中的notify(url.getBackupUrls())不会执行,此处也不会创建文件。
  • 最后是notify(url.getBackupUrls())(TODO 这里后续会写

再来看FailbackRegistry:

只做了一件事,启动了一个含有一个名为DubboRegistryFailedRetryTimer的后台线程的ScheduledThreadPool,线程创建5s后开始第一次执行retry(),之后每隔5s执行一次。来看一下retry()

  1     /**
  2      * 将所有注册失败的url(failedRegistered中的url)进行注册,之后从failedRegistered进行移除;
  3      * 将所有反注册失败的url(failedUnregistered中的url)进行反注册,之后从failedUnregistered进行移除;
  4      * 将所有订阅失败的url(failedSubscribed中的url)进行重新订阅,之后从failedSubscribed进行移除;
  5      * 将所有反订阅失败的url(failedUnsubscribed中的url)进行反订阅,之后从failedUnsubscribed进行移除;
  6      * 将所有通知失败的url(failedNotified中的url)进行通知,之后从failedNotified进行移除;
  7      */
  8     protected void retry() {
  9         if (!failedRegistered.isEmpty()) {
 10             Set<URL> failed = new HashSet<URL>(failedRegistered);
 11             if (failed.size() > 0) {
 12                 if (logger.isInfoEnabled()) {
 13                     logger.info("Retry register " + failed);
 14                 }
 15                 try {
 16                     for (URL url : failed) {
 17                         try {
 18                             doRegister(url);
 19                             failedRegistered.remove(url);
 20                         } catch (Throwable t) { // 忽略所有异常,等待下次重试
 21                             logger.warn("Failed to retry register " + failed + ", waiting for again, cause: " + t.getMessage(), t);
 22                         }
 23                     }
 24                 } catch (Throwable t) { // 忽略所有异常,等待下次重试
 25                     logger.warn("Failed to retry register " + failed + ", waiting for again, cause: " + t.getMessage(), t);
 26                 }
 27             }
 28         }
 29         if (!failedUnregistered.isEmpty()) {
 30             Set<URL> failed = new HashSet<URL>(failedUnregistered);
 31             if (failed.size() > 0) {
 32                 if (logger.isInfoEnabled()) {
 33                     logger.info("Retry unregister " + failed);
 34                 }
 35                 try {
 36                     for (URL url : failed) {
 37                         try {
 38                             doUnregister(url);
 39                             failedUnregistered.remove(url);
 40                         } catch (Throwable t) { // 忽略所有异常,等待下次重试
 41                             logger.warn("Failed to retry unregister  " + failed + ", waiting for again, cause: " + t.getMessage(), t);
 42                         }
 43                     }
 44                 } catch (Throwable t) { // 忽略所有异常,等待下次重试
 45                     logger.warn("Failed to retry unregister  " + failed + ", waiting for again, cause: " + t.getMessage(), t);
 46                 }
 47             }
 48         }
 49         if (!failedSubscribed.isEmpty()) {
 50             Map<URL, Set<NotifyListener>> failed = new HashMap<URL, Set<NotifyListener>>(failedSubscribed);
 51             for (Map.Entry<URL, Set<NotifyListener>> entry : new HashMap<URL, Set<NotifyListener>>(failed).entrySet()) {
 52                 if (entry.getValue() == null || entry.getValue().size() == 0) {
 53                     failed.remove(entry.getKey());
 54                 }
 55             }
 56             if (failed.size() > 0) {
 57                 if (logger.isInfoEnabled()) {
 58                     logger.info("Retry subscribe " + failed);
 59                 }
 60                 try {
 61                     for (Map.Entry<URL, Set<NotifyListener>> entry : failed.entrySet()) {
 62                         URL url = entry.getKey();
 63                         Set<NotifyListener> listeners = entry.getValue();
 64                         for (NotifyListener listener : listeners) {
 65                             try {
 66                                 doSubscribe(url, listener);//listener需要一个一个订阅,每订阅一个,就将该listener从当前的url监听列表中移除
 67                                 listeners.remove(listener);
 68                             } catch (Throwable t) { // 忽略所有异常,等待下次重试
 69                                 logger.warn("Failed to retry subscribe " + failed + ", waiting for again, cause: " + t.getMessage(), t);
 70                             }
 71                         }
 72                     }
 73                 } catch (Throwable t) { // 忽略所有异常,等待下次重试
 74                     logger.warn("Failed to retry subscribe " + failed + ", waiting for again, cause: " + t.getMessage(), t);
 75                 }
 76             }
 77         }
 78         if (!failedUnsubscribed.isEmpty()) {
 79             Map<URL, Set<NotifyListener>> failed = new HashMap<URL, Set<NotifyListener>>(failedUnsubscribed);
 80             for (Map.Entry<URL, Set<NotifyListener>> entry : new HashMap<URL, Set<NotifyListener>>(failed).entrySet()) {
 81                 if (entry.getValue() == null || entry.getValue().size() == 0) {
 82                     failed.remove(entry.getKey());
 83                 }
 84             }
 85             以上是关于7.6 服务远程暴露 - 注册服务到zookeeper的主要内容,如果未能解决你的问题,请参考以下文章

Dubbo服务发布引用

dubbo和zookeeper

Dubbo服务暴露分析

dubbo+zookeeper+springmvc搭建实例教程

dubbo+zookeeper+springmvc搭建实例教程

dubbo专题-服务暴露总结(本地暴露+远程暴露时序图)