Dubbo——Registry服务注册
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Dubbo——Registry服务注册相关的知识,希望对你有一定的参考价值。
参考技术A 注册中心(Registry)在微服务架构中的作用举足轻重,有了它,服务提供者(Provider) 和消费者(Consumer) 就能感知彼此。Registry 只是 Consumer 和 Provider 感知彼此状态变化的一种便捷途径而已,它们彼此的实际通讯交互过程是直接进行的,对于 Registry 来说是透明无感的。Provider 状态发生变化了,会由 Registry 主动推送订阅了该 Provider 的所有 Consumer,这保证了 Consumer 感知 Provider 状态变化的及时性,也将和具体业务需求逻辑交互解耦,提升了系统的稳定性。
Dubbo 中存在很多概念,但有些理解起来就特别费劲,如本文的 Registry,翻译过来的意思是“注册中心”,但它其实是应用本地的注册中心客户端,真正的“注册中心”服务是其他独立部署的进程,或进程组成的集群,比如 ZooKeeper 集群。本地的 Registry 通过和 ZooKeeper 等进行实时的信息同步,维持这些内容的一致性,从而实现了注册中心这个特性。另外,就 Registry 而言,Consumer 和 Provider 只是个用户视角的概念,它们被抽象为了一条 URL 。
RegistryFactory 就是产生一个注册中心的工程,它有个自适应的方法getRegistry,那么我们知道dubbo会通过javassist动态产生一个RegistryFactory$Adaptive类,并且getRegistry方法的内部实现大致是如下:
它通过传入的URL的protocol协议字段排判断是什么类型注册中心。例如,url的protocol的协议是zookeeper,那么就会根据SPI的ExtensionLoader.getExtensionLoader(RegistryFactory.class).getExtension("zookeeper")得到一个产生ZooKeeper注册中心的工厂,也就是ZookeeperRegistryFactory,而ZookeeperRegistryFactory这个类的getRegistry就是返回一个Zookeeper注册中心。
可以看出其语义,一个注册中心Registry是一个节点(extends Node),并且它具有注册服务(extends RegistryService)的功能。
dubbo支持如下这些注册中心zookeeper、consul、etcd3、eureka、nacas、redis、sofa,那么就会产生相应如下的Registry:ZookeeperRegistry、ConsulRegistry、EtcdRegistry、NacosRegistry、RedisRegistry、SofaRegistry。类图如下:
所以我们知道,这些注册中心都是继承FailbackRegistry,这个FailbackRegistry其意思就是说,如果一个服务注册到当前某个注册中心注册失败后,可会在后台产生一个daemon线程,定时的把注册失败服务重新注册,并且有一定的重试限制。
在上面的类图中我们并没有发现有个名为EurekaRegistry这样的类,因为实现了另一个接口ServiceDiscovery方式,类名为EurekaServiceDiscovery来进行服务发现。
dubbo的协议是通过名为org.apache.dubbo.rpc.Protocol来进行抽象的,那么注册协议也是一样的,是通过org.apache.dubbo.registry.integration.RegistryProtocol来表达的,继承org.apache.dubbo.rpc.Protocol。RegistryPrtocol是扩展点Protocol的具体实现,会一次调用其setter方法来注入其需要的属性,RegistryPrtocol其中有个属性就是RegistryFactory,那么就要为它注入一个具体的RegistryFactory,那么这个具体的RegistryFactory工厂就是上面的RegistryFactory$Adaptive。因为注入的属性对象会从SpringExtensionFactory和SpiExtensionFactory工厂中查询,刚好RegistryFactory也是一个扩展点,所以会在SpiExtensionFactory找出,并且SpiExtensionFactory工厂的实现如下:
所以知道是返回一个自适应的扩展点,即RegistryFactory$Adaptive。
Protocol协议具有导出服务export()功能,和引用服务refer()功能。在RegistryProtocol中,在这个2个方法内就有对服务注册到注册中心的操作。
在服务导出中,首先要有一个认知,做dubbo服务暴露的时候,我们有2中方式,一种是通过注解的方式:
@DubboService、@Service(非spring的)。或者通过xml的方式<dubbo:service />。
不管采用哪一种方式,最终需要暴露的服务首先会包装成一个ServiceBean的对象。这个ServiceBean 持有具体需要服务注册的对象ref。ServiceBean的类图如下:
服务导出也是是一个繁琐的过程,本文只讨论其服务导出引入与注册中心交互。
DubboBootstrap是一个dubbo框架启动的帮助类,他有一个start()方法,在该方法的内部就会调用exportServices()用于导出服务,和调用referServices()进行引用服务。
一般使用dubbo框架的都会引入Spring框架,Spring框架有一个事件监听机制,dubbo正是监听Spring的上下文刷新事件ContextRefreshedEvent,来启动Dubbo服务的。这个服务监听类就是DubboBootstrapApplicationListener。
registry方法定位到FailbackRegistry,主要作用当服务注册失败后,可以在后端线程重试。
接下来分析AbstractRegistry 的作用和FailbackRegistry的重试机制,并且详细剖析ZookeeperRegistry。
首先,直接引出这个类的作用,该类主要把服务提供者信息缓存本地文件上,文件目录是:当前用户目录下的/.dubbo/dubbo-registry- hos-$port.cache。
在解读源码前,先阅读下AbstractRegistry类的成员变量,从成员变量中可以看到这个类是怎么完成数据的本地化存储的。
上面的注释已经非常的清晰了,这里就不在描述,需要关注的是notify()这个函数,所以当每个服务注册和订阅时,首次创建注册中心都会进行notify操作。具体来看下notify方法。
从上面可以知道,把消费端的订阅的服务信息存入了file文件中,doSaveProperties就是文件操作,不进行分析。再一次强调下,消费端订阅时,会订阅某个具体服务下3个节点(providers,configurations,routers)。
接着,FailbackRegistry继承自AbstractRegistry。
其构造函数如下,可以得知除了调用AbstractRegistry构造方法外,并且创建一个HashedWheelTimer类型的定时器。
并且FailbackRegistry 成员记录一组注册失败和订阅失败的集合,然后通过retryTimer定式扫描这些失败集合,重新发起订阅和注册。
下面是失败集合:
参考:
https://www.cnblogs.com/liferecord/p/13462175.html
https://www.cnblogs.com/liferecord/p/13497411.html
https://www.cnblogs.com/Cubemen/p/12294377.html
https://blog.csdn.net/cold___play/article/details/107007130
https://www.jianshu.com/p/75931e545b36
Dubbo-服务注册中心之AbstractRegistry
在dubbo中,关于注册中心Registry的有关实现封装在了dubbo-registry模块中。提供者(Provider)个消费者(Consumer)都是通过注册中心进行资源的调度。当服务启动时,provider会调用注册中心的register方法将自己的服务通过url的方式发布到注册中心,而consumer订阅其他服务时,会将订阅的服务通过url发送给注册中心(URL中通常会包含各种配置)。当某个服务被关闭时,它则会从注册中心中移除,当某个服务被修改时,则会调用notify方法触发所有的监听器。
首先简单介绍一下在dubbo的基本统一数据模型URL
统一数据模型URL
在dubbo中定义的url与传统的url有所不同,用于在扩展点之间传输数据,可以从url参数中获取配置信息等数据,这一点很重要。
描述一个dubbo协议的服务
dubbo://192.168.1.6:20880/moe.cnkirito.sample.HelloService?timeout=3000
描述一个消费者
consumer://30.5.120.217/org.apache.dubbo.demo.DemoService?application=demo-consumer&category=consumers&check=false&dubbo=2.0.2&interface=org.apache.dubbo.demo.DemoService&methods=sayHello&pid=1209&qos.port=33333&side=consumer×tamp=1545721827784
接下来将着重介绍几个重要的类。
AbstractRegistry
AbstractRegistry实现的是Registry接口,是Registry的抽象类。为了减轻注册中心的压力,在该类中实现了把本地url缓存到内存缓存property文件中,并且实现了注册中心的注册、订阅等方法。
在该类中有介个关于url的变量。
private final Set<URL> registered = new ConcurrentHashSet<URL>();
-> 记录已经注册服务的URL集合,注册的URL不仅仅可以是服务提供者的,也可以是服务消费者的。private final ConcurrentMap<URL, Set<NotifyListener>> subscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>();
-> 消费者url订阅的监听器集合private final ConcurrentMap<URL, Map<String, List<URL>>> notified = new ConcurrentHashMap<URL, Map<String, List<URL>>>();
-> 某个消费者被通知的服务URL集合,最外部URL的key是消费者的URL,value是一个map集合,里面的map中的key为分类名,value是该类下的服务url集合。private URL registryUrl;
-> 注册中心URLprivate File file;
-> 本地磁盘缓存文件,缓存注册中心的数据初始化
public AbstractRegistry(URL url) {
//1. 设置配置中心的地址
setUrl(url);
//2. 配置中心的URL中是否配置了同步保存文件属性,否则默认为false
syncSaveFile = url.getParameter(Constants.REGISTRY_FILESAVE_SYNC_KEY, false);
//3. 配置信息本地缓存的文件名
String filename = url.getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/.dubbo/dubbo-registry-" + url.getParameter(Constants.APPLICATION_KEY) + "-" + url.getAddress() + ".cache");
//逐层创建文件目录
File file = null;
if (ConfigUtils.isNotEmpty(filename)) {
file = new File(filename);
if (!file.exists() && file.getParentFile() != null && !file.getParentFile().exists()) {
if (!file.getParentFile().mkdirs()) {
throw new IllegalArgumentException("Invalid registry store file " + file + ", cause: Failed to create directory " + file.getParentFile() + "!");
}
}
}
this.file = file;
//如果现有配置缓存,则从缓存文件中加载属性
loadProperties();
notify(url.getBackupUrls());
}
加载本地磁盘缓存文件到内存缓存中,也就是把文件中的数据写入到properties中
private void loadProperties() {
if (file != null && file.exists()) {
InputStream in = null;
try {
in = new FileInputStream(file);
// 把数据写入到内存缓存中
properties.load(in);
if (logger.isInfoEnabled()) {
logger.info("Load registry store file " + file + ", data: " + properties);
}
} catch (Throwable e) {
logger.warn("Failed to load registry store file " + file, e);
} finally {
if (in != null) {
try {
in.close();
} catch (IOException e) {
logger.warn(e.getMessage(), e);
}
}
}
}
}
注册与取消注册
对registered变量执行add和remove操作
@Override
public void register(URL url) {
if (url == null) {
throw new IllegalArgumentException("register url == null");
}
if (logger.isInfoEnabled()) {
logger.info("Register: " + url);
}
registered.add(url);
}
@Override
public void unregister(URL url) {
if (url == null) {
throw new IllegalArgumentException("unregister url == null");
}
if (logger.isInfoEnabled()) {
logger.info("Unregister: " + url);
}
registered.remove(url);
}
订阅与取消订阅
通过消费者url从subscribed变量中获取该消费者的所有监听器集合,然后将该监听器放入到集合中,取消同理。
@Override
public void subscribe(URL url, NotifyListener listener) {
if (url == null) {
throw new IllegalArgumentException("subscribe url == null");
}
if (listener == null) {
throw new IllegalArgumentException("subscribe listener == null");
}
if (logger.isInfoEnabled()) {
logger.info("Subscribe: " + url);
}
// 获得该消费者url 已经订阅的服务 的监听器集合
Set<NotifyListener> listeners = subscribed.get(url);
if (listeners == null) {
subscribed.putIfAbsent(url, new ConcurrentHashSet<NotifyListener>());
listeners = subscribed.get(url);
}
// 添加某个服务的监听器
listeners.add(listener);
}
@Override
public void unsubscribe(URL url, NotifyListener listener) {
if (url == null) {
throw new IllegalArgumentException("unsubscribe url == null");
}
if (listener == null) {
throw new IllegalArgumentException("unsubscribe listener == null");
}
if (logger.isInfoEnabled()) {
logger.info("Unsubscribe: " + url);
}
Set<NotifyListener> listeners = subscribed.get(url);
if (listeners != null) {
listeners.remove(listener);
}
}
服务的恢复
注册的恢复包括注册服务的恢复和订阅服务的恢复,因为在内存中表留了注册的服务和订阅的服务,因此在恢复的时候会重新拉取这些数据,分别调用发布和订阅的方法来重新将其录入到注册中心中。
protected void recover() throws Exception {
// register
//把内存缓存中的registered取出来遍历进行注册
Set<URL> recoverRegistered = new HashSet<URL>(getRegistered());
if (!recoverRegistered.isEmpty()) {
if (logger.isInfoEnabled()) {
logger.info("Recover register url " + recoverRegistered);
}
for (URL url : recoverRegistered) {
register(url);
}
}
// subscribe
//把内存缓存中的subscribed取出来遍历进行订阅
Map<URL, Set<NotifyListener>> recoverSubscribed = new HashMap<URL, Set<NotifyListener>>(getSubscribed());
if (!recoverSubscribed.isEmpty()) {
if (logger.isInfoEnabled()) {
logger.info("Recover subscribe url " + recoverSubscribed.keySet());
}
for (Map.Entry<URL, Set<NotifyListener>> entry : recoverSubscribed.entrySet()) {
URL url = entry.getKey();
for (NotifyListener listener : entry.getValue()) {
subscribe(url, listener);
}
}
}
}
通知
protected void notify(List<URL> urls) {
if (urls == null || urls.isEmpty()) return;
// 遍历订阅URL的监听器集合,通知他们
for (Map.Entry<URL, Set<NotifyListener>> entry : getSubscribed().entrySet()) {
URL url = entry.getKey();
// 匹配
if (!UrlUtils.isMatch(url, urls.get(0))) {
continue;
}
// 遍历监听器集合,通知他们
Set<NotifyListener> listeners = entry.getValue();
if (listeners != null) {
for (NotifyListener listener : listeners) {
try {
notify(url, listener, filterEmpty(url, urls));
} catch (Throwable t) {
logger.error("Failed to notify registry event, urls: " + urls + ", cause: " + t.getMessage(), t);
}
}
}
}
}
/**
* 通知监听器,URL 变化结果
* @param url
* @param listener
* @param urls
*/
protected void notify(URL url, NotifyListener listener, List<URL> urls) {
if (url == null) {
throw new IllegalArgumentException("notify url == null");
}
if (listener == null) {
throw new IllegalArgumentException("notify listener == null");
}
if ((urls == null || urls.isEmpty())
&& !Constants.ANY_VALUE.equals(url.getServiceInterface())) {
logger.warn("Ignore empty notify urls for subscribe url " + url);
return;
}
if (logger.isInfoEnabled()) {
logger.info("Notify urls for subscribe url " + url + ", urls: " + urls);
}
Map<String, List<URL>> result = new HashMap<String, List<URL>>();
// 将urls进行分类
for (URL u : urls) {
if (UrlUtils.isMatch(url, u)) {
// 按照url中key为category对应的值进行分类,如果没有该值,就找key为providers的值进行分类
String category = u.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
List<URL> categoryList = result.get(category);
if (categoryList == null) {
categoryList = new ArrayList<URL>();
// 分类结果放入result
result.put(category, categoryList);
}
categoryList.add(u);
}
}
if (result.size() == 0) {
return;
}
// 获得某一个消费者被通知的url集合(通知的 URL 变化结果)
Map<String, List<URL>> categoryNotified = notified.get(url);
if (categoryNotified == null) {
// 添加该消费者对应的url
notified.putIfAbsent(url, new ConcurrentHashMap<String, List<URL>>());
categoryNotified = notified.get(url);
}
// 处理通知监听器URL 变化结果
for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
String category = entry.getKey();
List<URL> categoryList = entry.getValue();
// 把分类标实和分类后的列表放入notified的value中
// 覆盖到 `notified`
// 当某个分类的数据为空时,会依然有 urls 。其中 `urls[0].protocol = empty` ,通过这样的方式,处理所有服务提供者为空的情况。
categoryNotified.put(category, categoryList);
// 保存到文件
saveProperties(url);
//通知监听器
listener.notify(categoryList);
}
}
在构造函数的最后一句,调用notify(url.getBackupUrls()); 来将注册中心url返回的urls来进行通知。从下面代码可以开出返回的urls是通过url的参数获得的。
public List<URL> getBackupUrls() {
List<URL> urls = new ArrayList<URL>();
urls.add(this);
String[] backups = getParameter(Constants.BACKUP_KEY, new String[0]);
if (backups != null && backups.length > 0) {
for (String backup : backups) {
urls.add(this.setAddress(backup));
}
}
return urls;
}
然后获取遍历所有订阅URL,类型Map<URL,Set<NotifyListener>>
,判断遍历中的当前url与传入的backupURL是否匹配,匹配了继续向下执行,否则则跳过这个url,再处理下一个url。当向下执行时,获取遍历当前url的监听器。对每个监听器执行notify(url, listener, filterEmpty(url, urls))
protected static List<URL> filterEmpty(URL url, List<URL> urls) {
if (urls == null || urls.isEmpty()) {
List<URL> result = new ArrayList<URL>(1);
result.add(url.setProtocol(Constants.EMPTY_PROTOCOL));
return result;
}
return urls;
}
如果urls为空,则将根据url的信息新建一个url,并设置协议为空协议,放入到urls中。
然后执行notify方法,将backupURLS进行分类,放入到result中。
在上述中遍历所有订阅的urls,然后在每个url中再执行nofity,所以接下来的步骤可以理解成遍历订阅的urls,在循环内部获取每个url的被通知的urls集合。
每个url获取一个被通知的urls集合,categoryNotified
之后遍历backURLs,它会覆盖掉原来被通知的集合categoryNotified
遍历结束后,会将结果保存到文件中,
最后通知监听器处理,最后的这个通知方法在之后的篇章解释。
以上是关于Dubbo——Registry服务注册的主要内容,如果未能解决你的问题,请参考以下文章