7.6 服务远程暴露 - 注册服务到zookeeper
Posted 赵计刚
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了7.6 服务远程暴露 - 注册服务到zookeeper相关的知识,希望对你有一定的参考价值。
为了安全:服务启动的ip全部使用10.10.10.10
远程服务的暴露总体步骤:
- 将ref封装为invoker
- 将invoker转换为exporter
- 启动netty
- 注册服务到zookeeper
- 订阅
- 返回新的exporter实例
在7.4 服务远程暴露 - 创建Exporter与启动netty服务端中,实现了前三步,本节实现第四步:注册服务到zk。总体代码如下:RegistryProtocol.export(final Invoker<T> originInvoker)
1 final Registry registry = getRegistry(originInvoker);//创建ZookeeperRegistry实例:创建CuratorClient,并启动会话。 2 final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);//获取真正要注册在zk上的url 3 registry.register(registedProviderUrl);//创建节点(即注册服务到zk上)
说明:
- 第一句代码用来创建ZookeeperRegistry实例:创建CuratorClient,并启动会话。
- 第二句代码获取真正要注册在zk上的url
- 第三句代码实现创建节点(即注册服务到zk上)
一 创建ZookeeperRegistry实例
1 RegistryProtocol.getRegistry(final Invoker<?> originInvoker)
1 /** 2 * 根据invoker的地址获取registry实例 3 */ 4 private Registry getRegistry(final Invoker<?> originInvoker) { 5 URL registryUrl = originInvoker.getUrl(); 6 if (Constants.REGISTRY_PROTOCOL.equals(registryUrl.getProtocol())) { 7 String protocol = registryUrl.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_DIRECTORY);//zookeeper 8 registryUrl = registryUrl.setProtocol(protocol).removeParameter(Constants.REGISTRY_KEY); 9 } 10 return registryFactory.getRegistry(registryUrl); 11 }
首先对originInvoker中的url进行处理:
- 将协议换成zookeeper
- 去掉registry=zookeeper的参数
来看一下originInvoker的url:(解码后的)
registry://10.211.55.5:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&client=curator&dubbo=2.0.0&export=dubbo://10.10.10.10:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=2791&side=provider×tamp=1507262031554&pid=2791®istry=zookeeper×tamp=1507262031521
说明:
- 第一个红色部分代表协议:zookeeper
- 第二个红色部分是export参数
- 第三个红色部分是registry=zookeeper
经过处理之后的registryUrl为:
zookeeper://10.211.55.5:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&client=curator&dubbo=2.0.0&export=dubbo://10.10.10.10:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=2791&side=provider×tamp=1507262031554&pid=2791×tamp=1507262031521
之后使用注册工厂来创建注册中心。
2 RegistryFactory$Adaptive.getRegistry(com.alibaba.dubbo.common.URL registryUrl)
1 public class RegistryFactory$Adaptive implements com.alibaba.dubbo.registry.RegistryFactory { 2 public com.alibaba.dubbo.registry.Registry getRegistry(com.alibaba.dubbo.common.URL arg0) { 3 if (arg0 == null) 4 throw new IllegalArgumentException("url == null"); 5 com.alibaba.dubbo.common.URL url = arg0; 6 String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );//zookeeper 7 if(extName == null) 8 throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.registry.RegistryFactory) name from url(" + url.toString() + ") use keys([protocol])"); 9 com.alibaba.dubbo.registry.RegistryFactory extension = (com.alibaba.dubbo.registry.RegistryFactory)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.registry.RegistryFactory.class).getExtension(extName); 10 return extension.getRegistry(arg0); 11 } 12 }
这里获取到的extension是ZookeeperRegistryFactory,之后,使用ZookeeperRegistryFactory进行Registry的创建。首先来看一下ZookeeperRegistryFactory的继承图:
getRegistry方法在ZookeeperRegistryFactory的父类AbstractRegistryFactory中。
3 AbstractRegistryFactory.getRegistry(URL registryUrl)
1 public Registry getRegistry(URL url) { 2 url = url.setPath(RegistryService.class.getName()) 3 .addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName()) 4 .removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY); 5 String key = url.toServiceString(); 6 // 锁定注册中心获取过程,保证注册中心单一实例 7 LOCK.lock(); 8 try { 9 Registry registry = REGISTRIES.get(key); 10 if (registry != null) { 11 return registry; 12 } 13 registry = createRegistry(url); 14 if (registry == null) { 15 throw new IllegalStateException("Can not create registry " + url); 16 } 17 REGISTRIES.put(key, registry); 18 return registry; 19 } finally { 20 // 释放锁 21 LOCK.unlock(); 22 } 23 }
流程:
- 先处理url,之后获取Registry的key,然后根据该key从Map<String, Registry> REGISTRIES注册中心集合缓存中获取Registry,如果有,直接返回,如果没有,创建Registry,之后存入缓存,最后返回。
首先处理传入的registryUrl:
- 设置:path=com.alibaba.dubbo.registry.RegistryService
- 添加参数:interface=com.alibaba.dubbo.registry.RegistryService
- 去除export参数
最终得到的registryUrl如下:
zookeeper://10.211.55.5:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&client=curator&dubbo=2.0.0&interface=com.alibaba.dubbo.registry.RegistryService&pid=2791×tamp=1507262031521
之后,很具上述的registryUrl创建Registry的key,该{ key : Registry }最终会被存储在Map<String, Registry> REGISTRIES注册中心集合(该属性是ZookeeperRegistryFactory父类AbstractRegistryFactory的一个属性)中。
根据registryUrl创建Registry的key:url.toServiceString()
1 public String toServiceString() { 2 return buildString(true, false, true, true); 3 } 4 5 private String buildString(boolean appendUser, boolean appendParameter, boolean useIP, boolean useService, String... parameters) { 6 StringBuilder buf = new StringBuilder(); 7 if (protocol != null && protocol.length() > 0) { //protocol:// 8 buf.append(protocol); 9 buf.append("://"); 10 } 11 if (appendUser && username != null && username.length() > 0) { //protocol://username:password@host:port/group/interface{path}:version/parameters 12 buf.append(username); 13 if (password != null && password.length() > 0) { 14 buf.append(":"); 15 buf.append(password); 16 } 17 buf.append("@"); 18 } 19 String host; 20 if (useIP) { 21 host = getIp(); 22 } else { 23 host = getHost(); 24 } 25 if (host != null && host.length() > 0) { 26 buf.append(host); 27 if (port > 0) { 28 buf.append(":"); 29 buf.append(port); 30 } 31 } 32 String path; 33 if (useService) { 34 path = getServiceKey(); 35 } else { 36 path = getPath(); 37 } 38 if (path != null && path.length() > 0) { 39 buf.append("/"); 40 buf.append(path); 41 } 42 if (appendParameter) { 43 buildParameters(buf, true, parameters); 44 } 45 return buf.toString(); 46 } 47 48 public String getServiceKey() { 49 String inf = getServiceInterface();//先获取interface参数,如果没有的话,取path的值,这里都是com.alibaba.dubbo.registry.RegistryService 50 if (inf == null) return null; 51 StringBuilder buf = new StringBuilder(); 52 String group = getParameter(Constants.GROUP_KEY); 53 if (group != null && group.length() > 0) { 54 buf.append(group).append("/"); //interfacegroup 55 } 56 buf.append(inf); 57 String version = getParameter(Constants.VERSION_KEY); 58 if (version != null && version.length() > 0) { 59 buf.append(":").append(version); 60 } 61 return buf.toString(); 62 }
最终得到的应该是这样的形式:protocol://username:password@host:port/group/interface{path}:version?key1=value1&key2=value2...。
这里key=zookeeper://10.211.55.5:2181/com.alibaba.dubbo.registry.RegistryService
之后来到了真正创建Registry的地方。
1 public class ZookeeperRegistryFactory extends AbstractRegistryFactory { 2 private ZookeeperTransporter zookeeperTransporter; 3 4 public void setZookeeperTransporter(ZookeeperTransporter zookeeperTransporter) { 5 this.zookeeperTransporter = zookeeperTransporter; 6 } 7 8 public Registry createRegistry(URL url) { 9 return new ZookeeperRegistry(url, zookeeperTransporter); 10 } 11 }
这里的zookeeperTransporter对象是一个com.alibaba.dubbo.remoting.zookeeper.ZookeeperTransporter$Adaptive对象。
在创建ZookeeperRegistry之前来看一下其继承图:
new ZookeeperRegistry(registryUrl, ZookeeperTransporter$Adaptive对象)
1 private final static int DEFAULT_ZOOKEEPER_PORT = 2181; 2 private final static String DEFAULT_ROOT = "dubbo"; 3 private final String root; 4 private final Set<String> anyServices = new ConcurrentHashSet<String>(); 5 private final ConcurrentMap<URL, ConcurrentMap<NotifyListener, ChildListener>> zkListeners = new ConcurrentHashMap<URL, ConcurrentMap<NotifyListener, ChildListener>>(); 6 private final ZookeeperClient zkClient; 7 8 public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) { 9 super(url); 10 if (url.isAnyHost()) { 11 throw new IllegalStateException("registry address == null"); 12 } 13 String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);//dubbo 14 if (!group.startsWith(Constants.PATH_SEPARATOR)) { 15 group = Constants.PATH_SEPARATOR + group; 16 } 17 this.root = group;// /dubbo 18 zkClient = zookeeperTransporter.connect(url);//创建zk客户端,启动会话 19 zkClient.addStateListener(new StateListener() {//监听重新连接成功事件,重新连接成功后,之前已经完成注册和订阅的url要重新进行注册和订阅(因为临时节点可能已经跪了) 20 public void stateChanged(int state) { 21 if (state == RECONNECTED) { 22 try { 23 recover(); 24 } catch (Exception e) { 25 logger.error(e.getMessage(), e); 26 } 27 } 28 } 29 }); 30 }
new FailbackRegistry(registryUrl)
1 private final ScheduledExecutorService retryExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("DubboRegistryFailedRetryTimer", true)); 2 // 失败重试定时器,定时检查是否有请求失败,如有,无限次重试 3 private final ScheduledFuture<?> retryFuture; 4 private final Set<URL> failedRegistered = new ConcurrentHashSet<URL>(); 5 private final Set<URL> failedUnregistered = new ConcurrentHashSet<URL>(); 6 private final ConcurrentMap<URL, Set<NotifyListener>> failedSubscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>(); 7 private final ConcurrentMap<URL, Set<NotifyListener>> failedUnsubscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>(); 8 private final ConcurrentMap<URL, Map<NotifyListener, List<URL>>> failedNotified = new ConcurrentHashMap<URL, Map<NotifyListener, List<URL>>>(); 9 private AtomicBoolean destroyed = new AtomicBoolean(false); 10 11 public FailbackRegistry(URL url) { 12 super(url); 13 int retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD);//5*1000 14 this.retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() { 15 public void run() { 16 // 检测并连接注册中心 17 try { 18 retry(); 19 } catch (Throwable t) { // 防御性容错 20 logger.error("Unexpected error occur at failed retry, cause: " + t.getMessage(), t); 21 } 22 } 23 }, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS); 24 }
new AbstractRegistry(registryUrl)
1 // URL地址分隔符,用于文件缓存中,服务提供者URL分隔 2 private static final char URL_SEPARATOR = \' \'; 3 // URL地址分隔正则表达式,用于解析文件缓存中服务提供者URL列表 4 private static final String URL_SPLIT = "\\\\s+"; 5 // 本地磁盘缓存,其中特殊的key值.registies记录注册中心列表,其它均为notified服务提供者列表 6 private final Properties properties = new Properties(); 7 // 文件缓存定时写入 8 private final ExecutorService registryCacheExecutor = Executors.newFixedThreadPool(1, new NamedThreadFactory("DubboSaveRegistryCache", true)); 9 //是否是同步保存文件 10 private final boolean syncSaveFile; 11 // 本地磁盘缓存文件 12 private File file; 13 private final AtomicLong lastCacheChanged = new AtomicLong(); 14 private final Set<URL> registered = new ConcurrentHashSet<URL>();//已经注册的url集合 15 private final ConcurrentMap<URL, Set<NotifyListener>> subscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>();//已经订阅的<URL, Set<NotifyListener>> 16 private final ConcurrentMap<URL, Map<String, List<URL>>> notified = new ConcurrentHashMap<URL, Map<String, List<URL>>>();//已经通知的<URL, Map<String, List<URL>>> 17 private URL registryUrl;//注册url 18 private AtomicBoolean destroyed = new AtomicBoolean(false); 19 20 public AbstractRegistry(URL url) { 21 setUrl(url); 22 // 启动文件保存定时器 23 syncSaveFile = url.getParameter(Constants.REGISTRY_FILESAVE_SYNC_KEY, false); 24 String filename = url.getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/.dubbo/dubbo-registry-" + url.getHost() + ".cache"); 25 File file = null; 26 if (ConfigUtils.isNotEmpty(filename)) { 27 file = new File(filename); 28 if (!file.exists() && file.getParentFile() != null && !file.getParentFile().exists()) { 29 if (!file.getParentFile().mkdirs()) {//创建文件所在的文件夹 /Users/jigangzhao/.dubbo/ 30 throw new IllegalArgumentException("Invalid registry store file " + file + ", cause: Failed to create directory " + file.getParentFile() + "!"); 31 } 32 } 33 } 34 this.file = file; 35 loadProperties(); 36 notify(url.getBackupUrls()); 37 }
先简单的总结一下:父子三代分别做的事情:
- AbstractRegistry主要用来维护缓存文件。
- FailbackRegistry主要用来做失败重试操作(包括:注册失败/反注册失败/订阅失败/反订阅失败/通知失败的重试);也提供了供ZookeeperRegistry使用的zk重连后的恢复工作的方法。
- ZookeeperRegistry创建zk客户端,启动会话;并且调用FailbackRegistry实现zk重连后的恢复工作。
先看AbstractRegistry
- 设置属性registryUrl=url:zookeeper://10.211.55.5:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&client=curator&dubbo=2.0.0&interface=com.alibaba.dubbo.registry.RegistryService&pid=4685×tamp=1507286468150
- 创建文件/Users/jigangzhao/.dubbo/dubbo-registry-10.211.55.5.cache的文件夹/Users/jigangzhao/.dubbo
- 设置属性file:/Users/jigangzhao/.dubbo/dubbo-registry-10.211.55.5.cache文件,该文件存储信息将是这样的:
com.alibaba.dubbo.demo.DemoService=empty\\://10.10.10.10\\:20880/com.alibaba.dubbo.demo.DemoService?anyhost\\=true&application\\=demo-provider&category\\=configurators&check\\=false&dubbo\\=2.0.0&generic\\=false&interface\\=com.alibaba.dubbo.demo.DemoService&methods\\=sayHello&pid\\=5259&side\\=provider×tamp\\=1507294508053
- 如果file存在,将file中的内容写入properties属性;既然有读file,那么是什么时候写入file的呢?AbstractRegistry创建了一个含有一个名字为DubboSaveRegistryCache的后台线程的FixedThreadPool,只在在notify(URL url, NotifyListener listener, List<URL> urls)方法中会被调用,我们此处由于ConcurrentMap<URL, Set<NotifyListener>> subscribed为空,所以AbstractRegistry(URL url)中的notify(url.getBackupUrls())不会执行,此处也不会创建文件。
- 最后是notify(url.getBackupUrls())(TODO 这里后续会写)
再来看FailbackRegistry:
只做了一件事,启动了一个含有一个名为DubboRegistryFailedRetryTimer的后台线程的ScheduledThreadPool,线程创建5s后开始第一次执行retry(),之后每隔5s执行一次。来看一下retry()
1 /** 2 * 将所有注册失败的url(failedRegistered中的url)进行注册,之后从failedRegistered进行移除; 3 * 将所有反注册失败的url(failedUnregistered中的url)进行反注册,之后从failedUnregistered进行移除; 4 * 将所有订阅失败的url(failedSubscribed中的url)进行重新订阅,之后从failedSubscribed进行移除; 5 * 将所有反订阅失败的url(failedUnsubscribed中的url)进行反订阅,之后从failedUnsubscribed进行移除; 6 * 将所有通知失败的url(failedNotified中的url)进行通知,之后从failedNotified进行移除; 7 */ 8 protected void retry() { 9 if (!failedRegistered.isEmpty()) { 10 Set<URL> failed = new HashSet<URL>(failedRegistered); 11 if (failed.size() > 0) { 12 if (logger.isInfoEnabled()) { 13 logger.info("Retry register " + failed); 14 } 15 try { 16 for (URL url : failed) { 17 try { 18 doRegister(url); 19 failedRegistered.remove(url); 20 } catch (Throwable t) { // 忽略所有异常,等待下次重试 21 logger.warn("Failed to retry register " + failed + ", waiting for again, cause: " + t.getMessage(), t); 22 } 23 } 24 } catch (Throwable t) { // 忽略所有异常,等待下次重试 25 logger.warn("Failed to retry register " + failed + ", waiting for again, cause: " + t.getMessage(), t); 26 } 27 } 28 } 29 if (!failedUnregistered.isEmpty()) { 30 Set<URL> failed = new HashSet<URL>(failedUnregistered); 31 if (failed.size() > 0) { 32 if (logger.isInfoEnabled()) { 33 logger.info("Retry unregister " + failed); 34 } 35 try { 36 for (URL url : failed) { 37 try { 38 doUnregister(url); 39 failedUnregistered.remove(url); 40 } catch (Throwable t) { // 忽略所有异常,等待下次重试 41 logger.warn("Failed to retry unregister " + failed + ", waiting for again, cause: " + t.getMessage(), t); 42 } 43 } 44 } catch (Throwable t) { // 忽略所有异常,等待下次重试 45 logger.warn("Failed to retry unregister " + failed + ", waiting for again, cause: " + t.getMessage(), t); 46 } 47 } 48 } 49 if (!failedSubscribed.isEmpty()) { 50 Map<URL, Set<NotifyListener>> failed = new HashMap<URL, Set<NotifyListener>>(failedSubscribed); 51 for (Map.Entry<URL, Set<NotifyListener>> entry : new HashMap<URL, Set<NotifyListener>>(failed).entrySet()) { 52 if (entry.getValue() == null || entry.getValue().size() == 0) { 53 failed.remove(entry.getKey()); 54 } 55 } 56 if (failed.size() > 0) { 57 if (logger.isInfoEnabled()) { 58 logger.info("Retry subscribe " + failed); 59 } 60 try { 61 for (Map.Entry<URL, Set<NotifyListener>> entry : failed.entrySet()) { 62 URL url = entry.getKey(); 63 Set<NotifyListener> listeners = entry.getValue(); 64 for (NotifyListener listener : listeners) { 65 try { 66 doSubscribe(url, listener);//listener需要一个一个订阅,每订阅一个,就将该listener从当前的url监听列表中移除 67 listeners.remove(listener); 68 } catch (Throwable t) { // 忽略所有异常,等待下次重试 69 logger.warn("Failed to retry subscribe " + failed + ", waiting for again, cause: " + t.getMessage(), t); 70 } 71 } 72 } 73 } catch (Throwable t) { // 忽略所有异常,等待下次重试 74 logger.warn("Failed to retry subscribe " + failed + ", waiting for again, cause: " + t.getMessage(), t); 75 } 76 } 77 } 78 if (!failedUnsubscribed.isEmpty()) { 79 Map<URL, Set<NotifyListener>> failed = new HashMap<URL, Set<NotifyListener>>(failedUnsubscribed); 80 for (Map.Entry<URL, Set<NotifyListener>> entry : new HashMap<URL, Set<NotifyListener>>(failed).entrySet()) { 81 if (entry.getValue() == null || entry.getValue().size() == 0) { 82 failed.remove(entry.getKey()); 83 } 84 } 85 以上是关于7.6 服务远程暴露 - 注册服务到zookeeper的主要内容,如果未能解决你的问题,请参考以下文章dubbo+zookeeper+springmvc搭建实例教程