Dubbo学习SPI机制
Posted sambomaster
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Dubbo学习SPI机制相关的知识,希望对你有一定的参考价值。
最近两天忙里偷闲看了看Dubbo的源码,参考书是诣极写的《深入理解Apache Dubbo与实战》。上班的时候在公司的KM上做了些笔记,但是苦于文件发不出来,所以回来再记录一下,就当是备忘。我看的是2.6.*版本,最新的2.7.*相对旧版本是有些许改动的,不过影响不大,设计思想和流程都一样。
1.SPI机制
本质:一个接口(SPI),多种实现(扩展类)。
扩展类又分为三种:
(1)普通扩展类;
(2)包装扩展类;内部有对其他扩展类的引用,可以实现方法增强;装饰设计模式;
(3)自适应扩展类;通过自适应扩展类来调用方法时,可以根据url传入的参数或者其他参数动态决定加载哪一个扩展类实例来运行。可以用来“管理”普通扩展类,充当一个代理的角色。
2.ExtensionLoader源码
ExtensionLoader是SPI机制里最关键的一个类,可以实现为SPI加载扩展类的功能。其中有3个重要方法:getExtension,getActivateExtension,getAdaptiveExtension。从这3个方法入手阅读源码就差不多把这个类看完了。
getExtension(根据name获取扩展类)
public T getExtension(String name) {
if (StringUtils.isEmpty(name)) {
throw new IllegalArgumentException("Extension name == null");
}
//name为“true”则获取默认扩展类实例
if ("true".equals(name)) {
return getDefaultExtension();
}
final Holder<Object> holder = getOrCreateHolder(name);
Object instance = holder.get();
if (instance == null) {
synchronized (holder) {
instance = holder.get();
if (instance == null) {
//double check确保单例;instance仍为空则创建实例
instance = createExtension(name);
holder.set(instance);
}
}
}
return (T) instance;
}
点进createExtension方法
private T createExtension(String name) {
//先根据name拿到class对象
Class<?> clazz = getExtensionClasses().get(name);
if (clazz == null) {
throw findException(name);
}
try {
//再根据class对象获取实例
T instance = (T) EXTENSION_INSTANCES.get(clazz);
if (instance == null) {
//没有获取到则创建实例
EXTENSION_INSTANCES.putIfAbsent(clazz, clazz.newInstance());
instance = (T) EXTENSION_INSTANCES.get(clazz);
}
//对实例完成依赖注入
injectExtension(instance);
Set<Class<?>> wrapperClasses = cachedWrapperClasses;
if (CollectionUtils.isNotEmpty(wrapperClasses)) {
for (Class<?> wrapperClass : wrapperClasses) {
//对实例生成包装类实例,可以实现一些方法加强,本质上是装饰设计模式;包装类和扩展类一样,都实现了同一个SPI
instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
}
}
return instance;
} catch (Throwable t) {
throw new IllegalStateException("Extension instance (name: " + name + ", class: " +
type + ") couldn't be instantiated: " + t.getMessage(), t);
}
}
点进getExtensionClasses方法;此方法获取扩展类name->扩展类class对象的Map
private Map<String, Class<?>> getExtensionClasses() {
Map<String, Class<?>> classes = cachedClasses.get();
if (classes == null) {
synchronized (cachedClasses) {
classes = cachedClasses.get();
if (classes == null) {
//double check;classes仍为空则进行加载
classes = loadExtensionClasses();
cachedClasses.set(classes);
}
}
}
return classes;
}
点进loadExtensionClasses方法;此方法可以从配置文件加载扩展类信息
private Map<String, Class<?>> loadExtensionClasses() {
//获取SPI注解的value值,作为默认扩展类的名称
cacheDefaultExtensionName();
//从各目录下的配置文件加载
Map<String, Class<?>> extensionClasses = new HashMap<>();
loadDirectory(extensionClasses, DUBBO_INTERNAL_DIRECTORY, type.getName());
loadDirectory(extensionClasses, DUBBO_INTERNAL_DIRECTORY, type.getName().replace("org.apache", "com.alibaba"));
loadDirectory(extensionClasses, DUBBO_DIRECTORY, type.getName());
loadDirectory(extensionClasses, DUBBO_DIRECTORY, type.getName().replace("org.apache", "com.alibaba"));
loadDirectory(extensionClasses, SERVICES_DIRECTORY, type.getName());
loadDirectory(extensionClasses, SERVICES_DIRECTORY, type.getName().replace("org.apache", "com.alibaba"));
return extensionClasses;
}
点进loadDirectory方法;
private void loadDirectory(Map<String, Class<?>> extensionClasses, String dir, String type) {
//目录+接口名称=文件全路径
String fileName = dir + type;
try {
Enumeration<java.net.URL> urls;
ClassLoader classLoader = findClassLoader();
if (classLoader != null) {
urls = classLoader.getResources(fileName);
} else {
urls = ClassLoader.getSystemResources(fileName);
}
if (urls != null) {
while (urls.hasMoreElements()) {
java.net.URL resourceURL = urls.nextElement();
//对每个url进行加载,我理解是文件里定义的每一行
loadResource(extensionClasses, classLoader, resourceURL);
}
}
} catch (Throwable t) {
logger.error("Exception occurred when loading extension class (interface: " +
type + ", description file: " + fileName + ").", t);
}
}
点进loadResource;
private void loadResource(Map<String, Class<?>> extensionClasses, ClassLoader classLoader, java.net.URL resourceURL) {
try {
try (BufferedReader reader = new BufferedReader(new InputStreamReader(resourceURL.openStream(), StandardCharsets.UTF_8))) {
String line;
while ((line = reader.readLine()) != null) {
final int ci = line.indexOf('#');
if (ci >= 0) {
//去掉“#”后的注释部分
line = line.substring(0, ci);
}
line = line.trim();
if (line.length() > 0) {
try {
String name = null;
//用“=”分开,左边是扩展类名称,右边是扩展类的类路径
int i = line.indexOf('=');
if (i > 0) {
name = line.substring(0, i).trim();
line = line.substring(i + 1).trim();
}
if (line.length() > 0) {
//加载这个扩展类
loadClass(extensionClasses, resourceURL, Class.forName(line, true, classLoader), name);
}
} catch (Throwable t) {
IllegalStateException e = new IllegalStateException("Failed to load extension class (interface: " + type + ", class line: " + line + ") in " + resourceURL + ", cause: " + t.getMessage(), t);
exceptions.put(line, e);
}
}
}
}
} catch (Throwable t) {
logger.error("Exception occurred when loading extension class (interface: " +
type + ", class file: " + resourceURL + ") in " + resourceURL, t);
}
}
点进loadClass;注意:
只是保存了扩展类的class对象,还没有真正实例化,第一次用到的时候再实例化
private void loadClass(Map<String, Class<?>> extensionClasses, java.net.URL resourceURL, Class<?> clazz, String name) throws NoSuchMethodException {
if (!type.isAssignableFrom(clazz)) {
//如果这个扩展类没有实现此SPI,则抛异常
throw new IllegalStateException("Error occurred when loading extension class (interface: " +
type + ", class line: " + clazz.getName() + "), class "
+ clazz.getName() + " is not subtype of interface.");
}
if (clazz.isAnnotationPresent(Adaptive.class)) {
//这个扩展类是自适应扩展类,则缓存之
cacheAdaptiveClass(clazz);
} else if (isWrapperClass(clazz)) {
//这个扩展类是包装类,缓存之
cacheWrapperClass(clazz);
} else {
//这个扩展类是普通的扩展类
clazz.getConstructor();
if (StringUtils.isEmpty(name)) {
name = findAnnotationName(clazz);
if (name.length() == 0) {
throw new IllegalStateException("No such extension name for the class " + clazz.getName() + " in the config " + resourceURL);
}
}
//同一个扩展类可能有多个名称
String[] names = NAME_SEPARATOR.split(name);
if (ArrayUtils.isNotEmpty(names)) {
//如果这个扩展类打了@Activate注解,则缓存起来(getActivateExtension会用到)
cacheActivateClass(clazz, names[0]);
for (String n : names) {
//缓存class对象->名字的映射
cacheName(clazz, n);
//缓存名字->class对象的映射
saveInExtensionClass(extensionClasses, clazz, n);
}
}
}
}
getActivateExtension
入口方法:
/**
* This is equivalent to {@code getActivateExtension(url, url.getParameter(key).split(","), null)}
*
* @param url url
* @param key url parameter key which used to get extension point names
* @param group group
* @return extension list which are activated.
* @see #getActivateExtension(org.apache.dubbo.common.URL, String[], String)
*/
public List<T> getActivateExtension(URL url, String key, String group) {
String value = url.getParameter(key);
return getActivateExtension(url, StringUtils.isEmpty(value) ? null : COMMA_SPLIT_PATTERN.split(value), group);
}
//1.根据url和group对打了@Activate且没有被values指定的扩展类进行筛选,符合条件的就激活;
//2.values是url中指定的扩展类名称(可能有多个);第二步就是激活url指定的这些扩展类
public List<T> getActivateExtension(URL url, String[] values, String group) {
List<T> exts = new ArrayList<>();
List<String> names = values == null ? new ArrayList<>(0) : Arrays.asList(values);
//如果url中指定了“-default”,则不激活打了@Activate注解的扩展类(直接跳过这个if语句块),只有url指定的扩展类才能激活
if (!names.contains(REMOVE_VALUE_PREFIX + DEFAULT_KEY)) {
//getExtensionClasses,确保下一步的cachedActivates已经被初始化
getExtensionClasses();
//对打了@Activate注解的扩展类进行检查,满足条件则加到exts中
for (Map.Entry<String, Object> entry : cachedActivates.entrySet()) {
String name = entry.getKey();
Object activate = entry.getValue();
String[] activateGroup, activateValue;
if (activate instanceof Activate) {
activateGroup = ((Activate) activate).group();
activateValue = ((Activate) activate).value();
} else if (activate instanceof com.alibaba.dubbo.common.extension.Activate) {
activateGroup = ((com.alibaba.dubbo.common.extension.Activate) activate).group();
activateValue = ((com.alibaba.dubbo.common.extension.Activate) activate).value();
} else {
continue;
}
//先匹配group,再匹配value
//注意:如果url中指定了这个扩展类,则先不add,后面遍历names时再add(防止重复add);如果url中指定了“-name”,则name对应的扩展类也不会被激活
//如果一个扩展类加了@Activate注解,group和value不匹配,但url指定了这个类,则最终还是可能被激活的
if (isMatchGroup(group, activateGroup)
&& !names.contains(name)
&& !names.contains(REMOVE_VALUE_PREFIX + name)
&& isActive(activateValue, url)) {
exts.add(getExtension(name));
}
}
//排序;先根据before和after来排,再根据order来排
exts.sort(ActivateComparator.COMPARATOR);
}
//处理完打了@Activate注解的扩展类后,逐一处理url指定的扩展类
List<T> usrs = new ArrayList<>();
for (int i = 0; i < names.size(); i++) {
String name = names.get(i);
//name以“-”开头/url指定了“-name”,都会导致name对应的扩展类不被激活
if (!name.startsWith(REMOVE_VALUE_PREFIX)
&& !names.contains(REMOVE_VALUE_PREFIX + name)) {
if (DEFAULT_KEY.equals(name)) {
//如果name为“default”且usrs不为空,则添加usrs到exts的开头部分。这个不太明白啥意思
if (!usrs.isEmpty()) {
exts.addAll(0, usrs);
usrs.clear();
}
} else {
usrs.add(getExtension(name));
}
}
}
if (!usrs.isEmpty()) {
exts.addAll(usrs);
}
return exts;
}
getAdaptiveExtension
public T getAdaptiveExtension() {
//从缓存获取实例
Object instance = cachedAdaptiveInstance.get();
if (instance == null) {
//没有获取到,且存在创建时抛出的异常,则抛出
if (createAdaptiveInstanceError != null) {
throw new IllegalStateException("Failed to create adaptive instance: " +
createAdaptiveInstanceError.toString(),
createAdaptiveInstanceError);
}
//尝试加锁创建实例
synchronized (cachedAdaptiveInstance) {
instance = cachedAdaptiveInstance.get();
if (instance == null) {
//double check;确保单例
try {
//创建实例并缓存
instance = createAdaptiveExtension();
cachedAdaptiveInstance.set(instance);
} catch (Throwable t) {
createAdaptiveInstanceError = t;
throw new IllegalStateException("Failed to create adaptive instance: " + t.toString(), t);
}
}
}
}
return (T) instance;
}
private T createAdaptiveExtension() {
try {
//创建实例并完成依赖注入
return injectExtension((T) getAdaptiveExtensionClass().newInstance());
} catch (Exception e) {
throw new IllegalStateException("Can't create adaptive extension " + type + ", cause: " + e.getMessage(), e);
}
}
private Class<?> getAdaptiveExtensionClass() {
//执行这个方法可能触发加载,可能给cachedAdaptiveClass完成赋值
getExtensionClasses();
if (cachedAdaptiveClass != null) {
//存在缓存则直接返回
return cachedAdaptiveClass;
}
//不存在已定义好的自适应扩展类,则自动生成一个
return cachedAdaptiveClass = createAdaptiveExtensionClass();
}
//Compiler本质上也是一个SPI,有多种实现;
//所以是先获取Compiler的自适应扩展类,通过它调用compile方法,从而根据不同的情况动态加载不同的扩展类来编译
//最终默认使用的是javassist扩展类;
//这里相当于是SPI的一个应用案例
private Class<?> createAdaptiveExtensionClass() {
String code = new AdaptiveClassCodeGenerator(type, cachedDefaultName).generate();
ClassLoader classLoader = findClassLoader();
org.apache.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(org.apache.dubbo.common.compiler.Compiler.class).getAdaptiveExtension();
return compiler.compile(code, classLoader);
}
3.ExtensionFactory源码
ExtensionFactory就是一个SPI,有三个扩展类:AdaptiveExtensionFactory,SpiExtensionFactory和SpringExtensionFactory。
AdaptiveExtensionFactory;看名字就知道它就是一个自适应扩展类。
调用它的getExtension方法时,本质上是调用了其他两个普通扩展类的方法。
/**
* AdaptiveExtensionFactory
*/
public class AdaptiveExtensionFactory implements ExtensionFactory {
//缓存ExtensionFactory的两个普通扩展类:SpiExtensionFactory和SpringExtensionFactory,由于字典顺序原因,前者在前
private final List<ExtensionFactory> factories;
public AdaptiveExtensionFactory() {
ExtensionLoader<ExtensionFactory> loader = ExtensionLoader.getExtensionLoader(ExtensionFactory.class);
List<ExtensionFactory> list = new ArrayList<ExtensionFactory>();
for (String name : loader.getSupportedExtensions()) {
list.add(loader.getExtension(name));
}
factories = Collections.unmodifiableList(list);
}
public <T> T getExtension(Class<T> type, String name) {
//先从SpiExtensionFactory找,再从SpringExtensionFactory找
for (ExtensionFactory factory : factories) {
T extension = factory.getExtension(type, name);
if (extension != null) {
return extension;
}
}
return null;
}
}
//getSupportedExtensions()方法可以触发加载,并获取普通扩展类的list
public Set<String> getSupportedExtensions() {
Map<String, Class<?>> clazzes = getExtensionClasses();
return Collections.unmodifiableSet(new TreeSet<>(clazzes.keySet()));
}
SpiExtensionFactory;一个普通扩展类;
通过getExtension获取的永远是自适应扩展类;
/**
* SpiExtensionFactory
*/
public class SpiExtensionFactory implements ExtensionFactory {
public <T> T getExtension(Class<T> type, String name) {
if (type.isInterface() && type.isAnnotationPresent(SPI.class)) {
ExtensionLoader<T> loader = ExtensionLoader.getExtensionLoader(type);
if (!loader.getSupportedExtensions().isEmpty()) {
//外界获取的永远是自适应扩展类
return loader.getAdaptiveExtension();
}
}
return null;
}
}
SpringExtensionFactory;一个普通扩展类;
可以实现从Spring上下文获取扩展类;
/**
* SpringExtensionFactory
*/
public class SpringExtensionFactory implements ExtensionFactory {
"unchecked") (
public <T> T getExtension(Class<T> type, String name) {
//SPI should be get from SpiExtensionFactory
if (type.isInterface() && type.isAnnotationPresent(SPI.class)) {
//SPI的扩展类必须由SpiExtensionFactory获取
return null;
}
for (ApplicationContext context : CONTEXTS) {
//遍历上下文先根据名字找
if (context.containsBean(name)) {
Object bean = context.getBean(name);
if (type.isInstance(bean)) {
return (T) bean;
}
}
}
logger.warn("No spring extension (bean) named:" + name + ", try to find an extension (bean) of type " + type.getName());
if (Object.class == type) {
return null;
}
for (ApplicationContext context : CONTEXTS) {
//根据名字找不到,再根据类型找
try {
return context.getBean(type);
} catch (NoUniqueBeanDefinitionException multiBeanExe) {
logger.warn("Find more than 1 spring extensions (beans) of type " + type.getName() + ", will stop auto injection. Please make sure you have specified the concrete parameter type and there's only one extension of that type.");
} catch (NoSuchBeanDefinitionException noBeanExe) {
if (logger.isDebugEnabled()) {
logger.debug("Error when get spring extension(bean) for type:" + type.getName(), noBeanExe);
}
}
}
logger.warn("No spring extension (bean) named:" + name + ", type:" + type.getName() + " found, stop get bean.");
return null;
}
}
初学Dubbo源码,对SPI机制印象深刻的代码就是这些。SPI机制也是学习Dubbo的基础,没有SPI就没有Dubbo。初看的时候感觉很难,但跟着一个个方法点进去看,感觉还是能看懂的。过几天又要忙起来了,等闲了继续往后面看。
以上是关于Dubbo学习SPI机制的主要内容,如果未能解决你的问题,请参考以下文章