Dubbo学习SPI机制

Posted sambomaster

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Dubbo学习SPI机制相关的知识,希望对你有一定的参考价值。

最近两天忙里偷闲看了看Dubbo的源码,参考书是诣极写的《深入理解Apache Dubbo与实战》。上班的时候在公司的KM上做了些笔记,但是苦于文件发不出来,所以回来再记录一下,就当是备忘。我看的是2.6.*版本,最新的2.7.*相对旧版本是有些许改动的,不过影响不大,设计思想和流程都一样。


1.SPI机制

本质:一个接口(SPI),多种实现(扩展类)。

扩展类又分为三种:

(1)普通扩展类;

(2)包装扩展类;内部有对其他扩展类的引用,可以实现方法增强;装饰设计模式;

(3)自适应扩展类;通过自适应扩展类来调用方法时,可以根据url传入的参数或者其他参数动态决定加载哪一个扩展类实例来运行。可以用来“管理”普通扩展类,充当一个代理的角色。


2.ExtensionLoader源码

ExtensionLoader是SPI机制里最关键的一个类,可以实现为SPI加载扩展类的功能。其中有3个重要方法:getExtension,getActivateExtension,getAdaptiveExtension。从这3个方法入手阅读源码就差不多把这个类看完了。


getExtension(根据name获取扩展类)

public T getExtension(String name) { if (StringUtils.isEmpty(name)) { throw new IllegalArgumentException("Extension name == null"); } //name为“true”则获取默认扩展类实例 if ("true".equals(name)) { return getDefaultExtension(); } final Holder<Object> holder = getOrCreateHolder(name); Object instance = holder.get(); if (instance == null) { synchronized (holder) { instance = holder.get(); if (instance == null) { //double check确保单例;instance仍为空则创建实例 instance = createExtension(name); holder.set(instance); } } } return (T) instance;}
点进createExtension方法private T createExtension(String name) { //先根据name拿到class对象 Class<?> clazz = getExtensionClasses().get(name); if (clazz == null) { throw findException(name); } try { //再根据class对象获取实例 T instance = (T) EXTENSION_INSTANCES.get(clazz); if (instance == null) {        //没有获取到则创建实例 EXTENSION_INSTANCES.putIfAbsent(clazz, clazz.newInstance()); instance = (T) EXTENSION_INSTANCES.get(clazz); } //对实例完成依赖注入 injectExtension(instance); Set<Class<?>> wrapperClasses = cachedWrapperClasses; if (CollectionUtils.isNotEmpty(wrapperClasses)) { for (Class<?> wrapperClass : wrapperClasses) { //对实例生成包装类实例,可以实现一些方法加强,本质上是装饰设计模式;包装类和扩展类一样,都实现了同一个SPI instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance)); } } return instance; } catch (Throwable t) { throw new IllegalStateException("Extension instance (name: " + name + ", class: " + type + ") couldn't be instantiated: " + t.getMessage(), t); }}
点进getExtensionClasses方法;此方法获取扩展类name->扩展类class对象的Mapprivate Map<String, Class<?>> getExtensionClasses() { Map<String, Class<?>> classes = cachedClasses.get(); if (classes == null) { synchronized (cachedClasses) { classes = cachedClasses.get(); if (classes == null) { //double check;classes仍为空则进行加载 classes = loadExtensionClasses(); cachedClasses.set(classes); } } } return classes;}
点进loadExtensionClasses方法;此方法可以从配置文件加载扩展类信息private Map<String, Class<?>> loadExtensionClasses() { //获取SPI注解的value值,作为默认扩展类的名称 cacheDefaultExtensionName(); //从各目录下的配置文件加载 Map<String, Class<?>> extensionClasses = new HashMap<>(); loadDirectory(extensionClasses, DUBBO_INTERNAL_DIRECTORY, type.getName()); loadDirectory(extensionClasses, DUBBO_INTERNAL_DIRECTORY, type.getName().replace("org.apache", "com.alibaba")); loadDirectory(extensionClasses, DUBBO_DIRECTORY, type.getName()); loadDirectory(extensionClasses, DUBBO_DIRECTORY, type.getName().replace("org.apache", "com.alibaba")); loadDirectory(extensionClasses, SERVICES_DIRECTORY, type.getName()); loadDirectory(extensionClasses, SERVICES_DIRECTORY, type.getName().replace("org.apache", "com.alibaba")); return extensionClasses;}
点进loadDirectory方法;private void loadDirectory(Map<String, Class<?>> extensionClasses, String dir, String type) { //目录+接口名称=文件全路径 String fileName = dir + type; try { Enumeration<java.net.URL> urls; ClassLoader classLoader = findClassLoader(); if (classLoader != null) { urls = classLoader.getResources(fileName); } else { urls = ClassLoader.getSystemResources(fileName); } if (urls != null) { while (urls.hasMoreElements()) { java.net.URL resourceURL = urls.nextElement(); //对每个url进行加载,我理解是文件里定义的每一行 loadResource(extensionClasses, classLoader, resourceURL); } } } catch (Throwable t) { logger.error("Exception occurred when loading extension class (interface: " + type + ", description file: " + fileName + ").", t); }}
点进loadResource;private void loadResource(Map<String, Class<?>> extensionClasses, ClassLoader classLoader, java.net.URL resourceURL) { try { try (BufferedReader reader = new BufferedReader(new InputStreamReader(resourceURL.openStream(), StandardCharsets.UTF_8))) { String line; while ((line = reader.readLine()) != null) { final int ci = line.indexOf('#'); if (ci >= 0) { //去掉“#”后的注释部分 line = line.substring(0, ci); } line = line.trim(); if (line.length() > 0) { try { String name = null; //用“=”分开,左边是扩展类名称,右边是扩展类的类路径 int i = line.indexOf('='); if (i > 0) { name = line.substring(0, i).trim(); line = line.substring(i + 1).trim(); } if (line.length() > 0) { //加载这个扩展类 loadClass(extensionClasses, resourceURL, Class.forName(line, true, classLoader), name); } } catch (Throwable t) { IllegalStateException e = new IllegalStateException("Failed to load extension class (interface: " + type + ", class line: " + line + ") in " + resourceURL + ", cause: " + t.getMessage(), t); exceptions.put(line, e); } } } } } catch (Throwable t) { logger.error("Exception occurred when loading extension class (interface: " + type + ", class file: " + resourceURL + ") in " + resourceURL, t); }}
点进loadClass;注意:只是保存了扩展类的class对象,还没有真正实例化,第一次用到的时候再实例化private void loadClass(Map<String, Class<?>> extensionClasses, java.net.URL resourceURL, Class<?> clazz, String name) throws NoSuchMethodException { if (!type.isAssignableFrom(clazz)) { //如果这个扩展类没有实现此SPI,则抛异常 throw new IllegalStateException("Error occurred when loading extension class (interface: " + type + ", class line: " + clazz.getName() + "), class " + clazz.getName() + " is not subtype of interface."); } if (clazz.isAnnotationPresent(Adaptive.class)) { //这个扩展类是自适应扩展类,则缓存之 cacheAdaptiveClass(clazz); } else if (isWrapperClass(clazz)) { //这个扩展类是包装类,缓存之 cacheWrapperClass(clazz); } else { //这个扩展类是普通的扩展类 clazz.getConstructor(); if (StringUtils.isEmpty(name)) { name = findAnnotationName(clazz); if (name.length() == 0) { throw new IllegalStateException("No such extension name for the class " + clazz.getName() + " in the config " + resourceURL); } } //同一个扩展类可能有多个名称 String[] names = NAME_SEPARATOR.split(name); if (ArrayUtils.isNotEmpty(names)) { //如果这个扩展类打了@Activate注解,则缓存起来(getActivateExtension会用到) cacheActivateClass(clazz, names[0]); for (String n : names) { //缓存class对象->名字的映射 cacheName(clazz, n); //缓存名字->class对象的映射 saveInExtensionClass(extensionClasses, clazz, n); } } }}


getActivateExtension

入口方法:/** * This is equivalent to {@code getActivateExtension(url, url.getParameter(key).split(","), null)} * * @param url url * @param key url parameter key which used to get extension point names * @param group group * @return extension list which are activated. * @see #getActivateExtension(org.apache.dubbo.common.URL, String[], String) */public List<T> getActivateExtension(URL url, String key, String group) { String value = url.getParameter(key); return getActivateExtension(url, StringUtils.isEmpty(value) ? null : COMMA_SPLIT_PATTERN.split(value), group);}
//1.根据url和group对打了@Activate且没有被values指定的扩展类进行筛选,符合条件的就激活;//2.values是url中指定的扩展类名称(可能有多个);第二步就是激活url指定的这些扩展类public List<T> getActivateExtension(URL url, String[] values, String group) { List<T> exts = new ArrayList<>(); List<String> names = values == null ? new ArrayList<>(0) : Arrays.asList(values); //如果url中指定了“-default”,则不激活打了@Activate注解的扩展类(直接跳过这个if语句块),只有url指定的扩展类才能激活 if (!names.contains(REMOVE_VALUE_PREFIX + DEFAULT_KEY)) { //getExtensionClasses,确保下一步的cachedActivates已经被初始化 getExtensionClasses(); //对打了@Activate注解的扩展类进行检查,满足条件则加到exts中 for (Map.Entry<String, Object> entry : cachedActivates.entrySet()) { String name = entry.getKey(); Object activate = entry.getValue();
String[] activateGroup, activateValue;
if (activate instanceof Activate) { activateGroup = ((Activate) activate).group(); activateValue = ((Activate) activate).value(); } else if (activate instanceof com.alibaba.dubbo.common.extension.Activate) { activateGroup = ((com.alibaba.dubbo.common.extension.Activate) activate).group(); activateValue = ((com.alibaba.dubbo.common.extension.Activate) activate).value(); } else { continue; } //先匹配group,再匹配value //注意:如果url中指定了这个扩展类,则先不add,后面遍历names时再add(防止重复add);如果url中指定了“-name”,则name对应的扩展类也不会被激活 //如果一个扩展类加了@Activate注解,group和value不匹配,但url指定了这个类,则最终还是可能被激活的 if (isMatchGroup(group, activateGroup) && !names.contains(name) && !names.contains(REMOVE_VALUE_PREFIX + name) && isActive(activateValue, url)) { exts.add(getExtension(name)); } } //排序;先根据before和after来排,再根据order来排 exts.sort(ActivateComparator.COMPARATOR); } //处理完打了@Activate注解的扩展类后,逐一处理url指定的扩展类 List<T> usrs = new ArrayList<>(); for (int i = 0; i < names.size(); i++) { String name = names.get(i); //name以“-”开头/url指定了“-name”,都会导致name对应的扩展类不被激活 if (!name.startsWith(REMOVE_VALUE_PREFIX) && !names.contains(REMOVE_VALUE_PREFIX + name)) { if (DEFAULT_KEY.equals(name)) { //如果name为“default”且usrs不为空,则添加usrs到exts的开头部分。这个不太明白啥意思 if (!usrs.isEmpty()) { exts.addAll(0, usrs); usrs.clear(); } } else { usrs.add(getExtension(name)); } } } if (!usrs.isEmpty()) { exts.addAll(usrs); } return exts;}


getAdaptiveExtension

public T getAdaptiveExtension() { //从缓存获取实例 Object instance = cachedAdaptiveInstance.get(); if (instance == null) { //没有获取到,且存在创建时抛出的异常,则抛出 if (createAdaptiveInstanceError != null) { throw new IllegalStateException("Failed to create adaptive instance: " + createAdaptiveInstanceError.toString(), createAdaptiveInstanceError); } //尝试加锁创建实例 synchronized (cachedAdaptiveInstance) { instance = cachedAdaptiveInstance.get(); if (instance == null) { //double check;确保单例 try { //创建实例并缓存 instance = createAdaptiveExtension(); cachedAdaptiveInstance.set(instance); } catch (Throwable t) { createAdaptiveInstanceError = t; throw new IllegalStateException("Failed to create adaptive instance: " + t.toString(), t); } } } }
return (T) instance;}

private T createAdaptiveExtension() { try { //创建实例并完成依赖注入 return injectExtension((T) getAdaptiveExtensionClass().newInstance()); } catch (Exception e) { throw new IllegalStateException("Can't create adaptive extension " + type + ", cause: " + e.getMessage(), e); }}

private Class<?> getAdaptiveExtensionClass() { //执行这个方法可能触发加载,可能给cachedAdaptiveClass完成赋值 getExtensionClasses(); if (cachedAdaptiveClass != null) { //存在缓存则直接返回 return cachedAdaptiveClass; } //不存在已定义好的自适应扩展类,则自动生成一个 return cachedAdaptiveClass = createAdaptiveExtensionClass();}
//Compiler本质上也是一个SPI,有多种实现;//所以是先获取Compiler的自适应扩展类,通过它调用compile方法,从而根据不同的情况动态加载不同的扩展类来编译//最终默认使用的是javassist扩展类;//这里相当于是SPI的一个应用案例private Class<?> createAdaptiveExtensionClass() { String code = new AdaptiveClassCodeGenerator(type, cachedDefaultName).generate(); ClassLoader classLoader = findClassLoader(); org.apache.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(org.apache.dubbo.common.compiler.Compiler.class).getAdaptiveExtension(); return compiler.compile(code, classLoader);}


3.ExtensionFactory源码

ExtensionFactory就是一个SPI,有三个扩展类:AdaptiveExtensionFactory,SpiExtensionFactory和SpringExtensionFactory。


AdaptiveExtensionFactory;看名字就知道它就是一个自适应扩展类。

调用它的getExtension方法时,本质上是调用了其他两个普通扩展类的方法。

/** * AdaptiveExtensionFactory */@Adaptivepublic class AdaptiveExtensionFactory implements ExtensionFactory { //缓存ExtensionFactory的两个普通扩展类:SpiExtensionFactory和SpringExtensionFactory,由于字典顺序原因,前者在前 private final List<ExtensionFactory> factories;
public AdaptiveExtensionFactory() { ExtensionLoader<ExtensionFactory> loader = ExtensionLoader.getExtensionLoader(ExtensionFactory.class); List<ExtensionFactory> list = new ArrayList<ExtensionFactory>(); for (String name : loader.getSupportedExtensions()) { list.add(loader.getExtension(name)); } factories = Collections.unmodifiableList(list); }
@Override public <T> T getExtension(Class<T> type, String name) { //先从SpiExtensionFactory找,再从SpringExtensionFactory找 for (ExtensionFactory factory : factories) { T extension = factory.getExtension(type, name); if (extension != null) { return extension; } } return null; }
}

//getSupportedExtensions()方法可以触发加载,并获取普通扩展类的listpublic Set<String> getSupportedExtensions() { Map<String, Class<?>> clazzes = getExtensionClasses(); return Collections.unmodifiableSet(new TreeSet<>(clazzes.keySet()));}


SpiExtensionFactory;一个普通扩展类;

通过getExtension获取的永远是自适应扩展类;

/** * SpiExtensionFactory */public class SpiExtensionFactory implements ExtensionFactory {
@Override public <T> T getExtension(Class<T> type, String name) { if (type.isInterface() && type.isAnnotationPresent(SPI.class)) { ExtensionLoader<T> loader = ExtensionLoader.getExtensionLoader(type); if (!loader.getSupportedExtensions().isEmpty()) { //外界获取的永远是自适应扩展类 return loader.getAdaptiveExtension(); } } return null; }
}


SpringExtensionFactory;一个普通扩展类;

可以实现从Spring上下文获取扩展类;

/** * SpringExtensionFactory */public class SpringExtensionFactory implements ExtensionFactory { @Override @SuppressWarnings("unchecked") public <T> T getExtension(Class<T> type, String name) {
//SPI should be get from SpiExtensionFactory if (type.isInterface() && type.isAnnotationPresent(SPI.class)) { //SPI的扩展类必须由SpiExtensionFactory获取 return null; }
for (ApplicationContext context : CONTEXTS) { //遍历上下文先根据名字找 if (context.containsBean(name)) { Object bean = context.getBean(name); if (type.isInstance(bean)) { return (T) bean; } } }
logger.warn("No spring extension (bean) named:" + name + ", try to find an extension (bean) of type " + type.getName());
if (Object.class == type) { return null; }
for (ApplicationContext context : CONTEXTS) { //根据名字找不到,再根据类型找 try { return context.getBean(type); } catch (NoUniqueBeanDefinitionException multiBeanExe) { logger.warn("Find more than 1 spring extensions (beans) of type " + type.getName() + ", will stop auto injection. Please make sure you have specified the concrete parameter type and there's only one extension of that type."); } catch (NoSuchBeanDefinitionException noBeanExe) { if (logger.isDebugEnabled()) { logger.debug("Error when get spring extension(bean) for type:" + type.getName(), noBeanExe); } } }
logger.warn("No spring extension (bean) named:" + name + ", type:" + type.getName() + " found, stop get bean.");
return null;    }}


初学Dubbo源码,对SPI机制印象深刻的代码就是这些。SPI机制也是学习Dubbo的基础,没有SPI就没有Dubbo。初看的时候感觉很难,但跟着一个个方法点进去看,感觉还是能看懂的。过几天又要忙起来了,等闲了继续往后面看。


以上是关于Dubbo学习SPI机制的主要内容,如果未能解决你的问题,请参考以下文章

理解 Dubbo SPI 扩展机制

深入理解 Dubbo SPI 扩展机制

dubbo spi扩展实现机制javasist

学习手写Dubbo中的SPI

Dubbo的SPI机制——AOP

来撸一撸Dubbo之SPI机制源码,SPI到底解决了什么问题?