Dubbo之SPI源码分析
Posted Java后端笔记
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Dubbo之SPI源码分析相关的知识,希望对你有一定的参考价值。
1.Dubbo SPI介绍
Dubbo的扩展点加载机制,用于在程序运行时,通过传入Url里面参数的不同,加载同一接口的不同实现, 同时也支持aop与ioc的功能。
2.使用方式
ExtensionLoader.getExtensionLoader(WrappedExt.class).getExtension("XXX");
直接获取具体的扩展点实现,会进行扩展点自动包装(aop)以及扩展点自动装配(ioc)
private Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
ExtensionLoader.getExtensionLoader(RegistryFactory.class).getAdaptiveExtension().getRegistry(registryUrl);
获取扩展点的适配类,会在运行时根据url参数调用不同实现类,接口参数要直接或者间接含有Url参数
3.源码分析
Dubbo SPI主要由ExtensionLoader这个类实现,我们的源码讲解也从这个类开始
一些术语
介于扩展点这个名词太过宽泛,在讲解源码的时候,我细化一下 扩展点接口:扩展点所针对的接口,一个接口能有多个扩展点实现 扩展点(名):扩展点实现类对应的别名 扩展点实现:扩展点的具体实现类 扩展点实现实例: 扩展点的具体实现类的实例
一些变量
private static final String SERVICES_DIRECTORY = "META-INF/services/";
private static final String DUBBO_DIRECTORY = "META-INF/dubbo/";
private static final String DUBBO_INTERNAL_DIRECTORY = DUBBO_DIRECTORY + "internal/";
上述三个常量是,扫描SPI配置文件的路径,文件名为接口的全限定名,文件内容为SPI实现别名=接口实现,多个换行显示,如
// 文件名:com.alibaba.dubbo.rpc.Protocol
filter=com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper
listener=com.alibaba.dubbo.rpc.protocol.ProtocolListenerWrapper
mock=com.alibaba.dubbo.rpc.support.MockProtocol
private static final ConcurrentMap<Class<?>, ExtensionLoader<?>> EXTENSION_LOADERS = new ConcurrentHashMap<Class<?>, ExtensionLoader<?>>();
private static final ConcurrentMap<Class<?>, Object> EXTENSION_INSTANCES = new ConcurrentHashMap<Class<?>, Object>();
上面有两个静态变量 第一个说明针对每个扩展点接口都会有一个ExtensionLoader对应 第二个是扩展点接口实现类对应实例的缓存
private final ExtensionFactory objectFactory;
这个是扩展点工厂,对扩展点实现进行依赖注入的时候使用
private volatile Class<?> cachedAdaptiveClass = null;
缓存当前扩展点的适配类
private final Holder<Object> cachedAdaptiveInstance = new Holder<Object>();
缓存当前扩展点适配类实例
private Set<Class<?>> cachedWrapperClasses;
缓存包装类
初始化流程
先看下ExtensionLoader的构造函数
private ExtensionLoader(Class<?> type) {
this.type = type;
//扩展点工厂不需要设置这个,不然会无限循环
objectFactory = (type == ExtensionFactory.class ? null : ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension());
}
这边会给当前的ExtensionLoader设置扩展点接口类型,以及扩展点工厂(用于依赖注入) 但是这边并没有对配置的扩展点进行加载,出于性能优化,并不是预加载,扩展点的加载在实际使用的时候在加载,比如
private Class<?> getAdaptiveExtensionClass() {
getExtensionClasses();
if (cachedAdaptiveClass != null) {
return cachedAdaptiveClass;
}
return cachedAdaptiveClass = createAdaptiveExtensionClass();
}
这个方法用来获取当前扩展点的适配器类,扩展点的加载在getExtensionClasses方法内实现
private Map<String, Class<?>> getExtensionClasses() {
Map<String, Class<?>> classes = cachedClasses.get();
if (classes == null) {
synchronized (cachedClasses) {
classes = cachedClasses.get();
if (classes == null) {
classes = loadExtensionClasses();
cachedClasses.set(classes);
}
}
}
return classes;
}
首先会从cachedClasses缓存加载扩展点配置,如果不存在,使用loadExtensionClasses从文件中加载扩展点,完成后缓存
private Map<String, Class<?>> loadExtensionClasses() {
final SPI defaultAnnotation = type.getAnnotation(SPI.class);
if(defaultAnnotation != null) {
String value = defaultAnnotation.value();
if(value != null && (value = value.trim()).length() > 0) {
String[] names = NAME_SEPARATOR.split(value);
if(names.length > 1) {
throw new IllegalStateException("more than 1 default extension name on extension " + type.getName()
+ ": " + Arrays.toString(names));
}
if(names.length == 1) cachedDefaultName = names[0];
}
}
Map<String, Class<?>> extensionClasses = new HashMap<String, Class<?>>();
loadFile(extensionClasses, DUBBO_INTERNAL_DIRECTORY);
loadFile(extensionClasses, DUBBO_DIRECTORY);
loadFile(extensionClasses, SERVICES_DIRECTORY);
return extensionClasses;
}
在loadExtensionClasses方法中,首先根据@SPI注解来设置默认的扩展点实现 然后在通过loadFile从各个路径的文件加载扩展点配置
private void loadFile(Map<String, Class<?>> extensionClasses, String dir) {
String fileName = dir + type.getName();
try {
Enumeration<java.net.URL> urls;
ClassLoader classLoader = findClassLoader();
if (classLoader != null) {
urls = classLoader.getResources(fileName);
} else {
urls = ClassLoader.getSystemResources(fileName);
}
if (urls != null) {
while (urls.hasMoreElements()) {
java.net.URL url = urls.nextElement();
try {
BufferedReader reader = new BufferedReader(new InputStreamReader(url.openStream(), "utf-8"));
try {
String line = null;
while ((line = reader.readLine()) != null) {
final int ci = line.indexOf('#');
if (ci >= 0) line = line.substring(0, ci);
line = line.trim();
if (line.length() > 0) {
try {
String name = null;
int i = line.indexOf('=');
if (i > 0) {
name = line.substring(0, i).trim();
line = line.substring(i + 1).trim();
}
if (line.length() > 0) {
Class<?> clazz = Class.forName(line, true, classLoader);
//如果内部配置类不是该接口的实现,抛出异常
if (! type.isAssignableFrom(clazz)) {
throw new IllegalStateException("Error when load extension class(interface: " +
type + ", class line: " + clazz.getName() + "), class "
+ clazz.getName() + "is not subtype of interface.");
}
//如果这个扩展点实现有@Adaptive注解,说明是适配类,缓存到cachedAdaptiveClass
//如果同一个扩展点有两个适配类,抛出异常
if (clazz.isAnnotationPresent(Adaptive.class)) {
if(cachedAdaptiveClass == null) {
cachedAdaptiveClass = clazz;
} else if (! cachedAdaptiveClass.equals(clazz)) {
throw new IllegalStateException("More than 1 adaptive class found: "
+ cachedAdaptiveClass.getClass().getName()
+ ", " + clazz.getClass().getName());
}
} else {
//不是适配类的逻辑
try {
//判断是不是包装类
//逻辑是看有没有以当前扩展点为参数的构造函数
//如果是包装类,缓存到cachedWrapperClasses
//包装类可以有多个,可以包装多次
clazz.getConstructor(type);
Set<Class<?>> wrappers = cachedWrapperClasses;
if (wrappers == null) {
cachedWrapperClasses = new ConcurrentHashSet<Class<?>>();
wrappers = cachedWrapperClasses;
}
wrappers.add(clazz);
} catch (NoSuchMethodException e) {
//不是适配类 也不是 包装类 那就是普通扩展点实现类
clazz.getConstructor();
//下面这部分逻辑用于当配置文件中不存在扩展点名的时候,生成扩展点名的逻辑
if (name == null || name.length() == 0) {
name = findAnnotationName(clazz);
if (name == null || name.length() == 0) {
if (clazz.getSimpleName().length() > type.getSimpleName().length()
&& clazz.getSimpleName().endsWith(type.getSimpleName())) {
name = clazz.getSimpleName().substring(0, clazz.getSimpleName().length() - type.getSimpleName().length()).toLowerCase();
} else {
throw new IllegalStateException("No such extension name for the class " + clazz.getName() + " in the config " + url);
}
}
}
//扩展点名可能有多个
String[] names = NAME_SEPARATOR.split(name);
if (names != null && names.length > 0) {
Activate activate = clazz.getAnnotation(Activate.class);
if (activate != null) {
//缓存<扩展点名,@Activate>
//只缓存第一个扩展点名
cachedActivates.put(names[0], activate);
}
for (String n : names) {
//这个缓存只缓存第一个扩展点
if (! cachedNames.containsKey(clazz)) {
cachedNames.put(clazz, n);
}
Class<?> c = extensionClasses.get(n);
if (c == null) {
//这个缓存 缓存所有扩展点名 可以对应同一个Class
extensionClasses.put(n, clazz);
} else if (c != clazz) {
throw new IllegalStateException("Duplicate extension " + type.getName() + " name " + n + " on " + c.getName() + " and " + clazz.getName());
}
}
}
}
}
}
} catch (Throwable t) {
IllegalStateException e = new IllegalStateException("Failed to load extension class(interface: " + type + ", class line: " + line + ") in " + url + ", cause: " + t.getMessage(), t);
exceptions.put(line, e);
}
}
} // end of while read lines
} finally {
reader.close();
}
} catch (Throwable t) {
logger.error("Exception when load extension class(interface: " +
type + ", class file: " + url + ") in " + url, t);
}
} // end of while urls
}
} catch (Throwable t) {
logger.error("Exception when load extension class(interface: " +
type + ", description file: " + fileName + ").", t);
}
}
loadFile用来加载文件中扩展点配置,大致逻辑如下
在类路径或者jar包的指定文件夹查找以类全限定名为名字的文件
按行解析文件,格式为扩展点名 = 扩展点实现类
解析该类是否为适配类(这个扩展点实现有@Adaptive注解),如果是,赋值到cachedAdaptiveClass
解析该类是否为包装类(这个扩展点实现有以扩展点接口为参数的参数),如果是,放入cachedWrapperClasses集合
解析该类为普通扩展点实现类,解析类的@Activate注解,缓存到cachedActivates,缓存扩展点实现类与第一个扩展点名到cachedNames,扩展点名与扩展点实现放入extensionClasses集合
扩展点一些特性
扩展点自动包装
通过装饰者模式,使用包装类包装原始的扩展点实现,在原始扩展点实现前后插入其他逻辑,实现aop功能。 在解析扩展点配置的时候,我们会从配置中找到这些包装类并且缓存到cachedWrapperClasses,然后在创建扩展点实现的时候,使用这些包装类依次包装原始扩展点 自动包装功能在创建扩展点实例的方法内
private T createExtension(String name) {
Class<?> clazz = getExtensionClasses().get(name);
if (clazz == null) {
throw findException(name);
}
try {
T instance = (T) EXTENSION_INSTANCES.get(clazz);
if (instance == null) {
EXTENSION_INSTANCES.putIfAbsent(clazz, (T) clazz.newInstance());
instance = (T) EXTENSION_INSTANCES.get(clazz);
}
//依赖注入
injectExtension(instance);
//包装,类似AOP
Set<Class<?>> wrapperClasses = cachedWrapperClasses;
if (wrapperClasses != null && wrapperClasses.size() > 0) {
for (Class<?> wrapperClass : wrapperClasses) {
instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
}
}
return instance;
} catch (Throwable t) {
throw new IllegalStateException("Extension instance(name: " + name + ", class: " +
type + ") could not be instantiated: " + t.getMessage(), t);
}
}
我们可以看到,一旦cachedWrapperClasses不为空,会通过wrapperClass.getConstructor(type).newInstance(instance)循环包装原始实现,从而实现了Aop的功能
扩展点自动装配
在扩展点内可能会依赖其他扩展点,或者Spring容器内的bean,SPI提供了自动注入这些依赖的功能。主要通过解析setter方法,并且通过objectFactory取得这些依赖并注入 功能实现在injectExtension,在创建扩展点实例的时候会被调用
private T injectExtension(T instance) {
try {
if (objectFactory != null) {
for (Method method : instance.getClass().getMethods()) {
if (method.getName().startsWith("set")
&& method.getParameterTypes().length == 1
&& Modifier.isPublic(method.getModifiers())) {
Class<?> pt = method.getParameterTypes()[0];
try {
String property = method.getName().length() > 3 ? method.getName().substring(3, 4).toLowerCase() + method.getName().substring(4) : "";
Object object = objectFactory.getExtension(pt, property);
if (object != null) {
method.invoke(instance, object);
}
} catch (Exception e) {
logger.error("fail to inject via method " + method.getName()
+ " of interface " + type.getName() + ": " + e.getMessage(), e);
}
}
}
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
return instance;
}
在解析setter方法之后,得到setter方法的注入类型,然后通过objectFactory提取实现 这个objectFactory在ExtensionLoader的构造函数中设置过,具体实现为ExtensionFactory的适配类AdaptiveExtensionFactory
objectFactory = (type == ExtensionFactory.class ? null : ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension());
我们看下适配类的实现
@Adaptive
public class AdaptiveExtensionFactory implements ExtensionFactory {
private final List<ExtensionFactory> factories;
public AdaptiveExtensionFactory() {
ExtensionLoader<ExtensionFactory> loader = ExtensionLoader.getExtensionLoader(ExtensionFactory.class);
List<ExtensionFactory> list = new ArrayList<ExtensionFactory>();
for (String name : loader.getSupportedExtensions()) {
list.add(loader.getExtension(name));
}
factories = Collections.unmodifiableList(list);
}
public <T> T getExtension(Class<T> type, String name) {
for (ExtensionFactory factory : factories) {
T extension = factory.getExtension(type, name);
if (extension != null) {
return extension;
}
}
return null;
}
}
可以看到在构造函数中会把所有ExtensionFactory的实现缓存起来,然后在getExtension的时候,依次调用ExtensionFactory的普通扩展点的getExtension方法来获取依赖 ExtensionFactory的普通实现有
spi=com.alibaba.dubbo.common.extension.factory.SpiExtensionFactory
spring=com.alibaba.dubbo.config.spring.extension.SpringExtensionFactory
spi和spring实现分别代表从ExtensionLoader和Spring容器获取对应依赖 需要注意一下spi的实现类中
public <T> T getExtension(Class<T> type, String name) {
if (type.isInterface() && type.isAnnotationPresent(SPI.class)) {
ExtensionLoader<T> loader = ExtensionLoader.getExtensionLoader(type);
if (loader.getSupportedExtensions().size() > 0) {
return loader.getAdaptiveExtension();
}
}
return null;
}
注入的为适配器类,不是具体普通实现类
扩展点自适应
扩展点自适应的意思是,扩展点会在程序运行时根据Url内的参数自动选择对应的实现进行调用,采用适配器类实现,这也是一个装饰者模式。 每个扩展点必须有一个适配类,如果没有,框架会通过Javaassist自动创建一个,但是有一个前提,需要自动适配的接口方法需要使用@adaptive注解 获取适配对象的方法如下
public T getAdaptiveExtension() {
Object instance = cachedAdaptiveInstance.get();
if (instance == null) {
if(createAdaptiveInstanceError == null) {
synchronized (cachedAdaptiveInstance) {
instance = cachedAdaptiveInstance.get();
if (instance == null) {
try {
instance = createAdaptiveExtension();
cachedAdaptiveInstance.set(instance);
} catch (Throwable t) {
createAdaptiveInstanceError = t;
throw new IllegalStateException("fail to create adaptive instance: " + t.toString(), t);
}
}
}
}
else {
throw new IllegalStateException("fail to create adaptive instance: " + createAdaptiveInstanceError.toString(), createAdaptiveInstanceError);
}
}
return (T) instance;
}
这边会对适配类实例做一个缓存,如果缓存不存在,调用createAdaptiveExtension创建
private T createAdaptiveExtension() {
try {
return injectExtension((T) getAdaptiveExtensionClass().newInstance());
} catch (Exception e) {
throw new IllegalStateException("Can not create adaptive extenstion " + type + ", cause: " + e.getMessage(), e);
}
}
createAdaptiveExtension方法中通过getAdaptiveExtensionClass来取得对应的适配器类,然后调用newInstance实例化,在通过injectExtension注入依赖后返回
private Class<?> getAdaptiveExtensionClass() {
getExtensionClasses();
if (cachedAdaptiveClass != null) {
return cachedAdaptiveClass;
}
return cachedAdaptiveClass = createAdaptiveExtensionClass();
}
在getAdaptiveExtensionClass方法中,首先会调用getExtensionClasses方法初始化,如果在初始化过程中找不到适配类,那么通过createAdaptiveExtensionClass方法自己创建适配类
private Class<?> createAdaptiveExtensionClass() {
String code = createAdaptiveExtensionClassCode();
ClassLoader classLoader = findClassLoader();
com.alibaba.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.common.compiler.Compiler.class).getAdaptiveExtension();
return compiler.compile(code, classLoader);
}
这边会先调用createAdaptiveExtensionClassCode方法拼接具体适配类的代码,然后在使用Compiler实现去生成Class对象,Compiler也使用了扩展点,默认使用javaassist实现 看下createAdaptiveExtensionClassCode方法生成的适配类的代码样式
package com.alibaba.dubbo.rpc;
import com.alibaba.dubbo.common.extension.ExtensionLoader;
public class Protocol$Adpative implements com.alibaba.dubbo.rpc.Protocol {
public void destroy() {
throw new UnsupportedOperationException("method public abstract void com.alibaba.dubbo.rpc.Protocol.destroy() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
}
public int getDefaultPort() {
throw new UnsupportedOperationException("method public abstract int com.alibaba.dubbo.rpc.Protocol.getDefaultPort() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
}
public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException {
if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
if (arg0.getUrl() == null)
throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
com.alibaba.dubbo.common.URL url = arg0.getUrl();
String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
if (extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.export(arg0);
}
public com.alibaba.dubbo.rpc.Invoker refer(java.lang.Class arg0, com.alibaba.dubbo.common.URL arg1) throws com.alibaba.dubbo.rpc.RpcException {
if (arg1 == null) throw new IllegalArgumentException("url == null");
com.alibaba.dubbo.common.URL url = arg1;
String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
if (extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.refer(arg0, arg1);
}
}
大致的逻辑和开始说的一样,通过url解析出参数,解析的逻辑由@Adaptive的value参数控制,然后再根据得到的扩展点名获取扩展点实现,然后进行调用 具体拼接逻辑大家可以看createAdaptiveExtensionClassCode的实现
扩展点自动激活
对于集合类扩展点,比如Filter,需要一次性获取多个实现进行链式调用,这里用到了扩展点自动激活,对于有@Activate注解的接口类,即使不显示通过扩展名获取,如果url里面含有@Activate的value配置的参数,也能获取到这个扩展点 我们通过getActivateExtension这个来获取多个扩展点,values是使用者主动设置的扩展点名
public List<T> getActivateExtension(URL url, String[] values, String group) {
List<T> exts = new ArrayList<T>();
List<String> names = values == null ? new ArrayList<String>(0) : Arrays.asList(values);
if (! names.contains(Constants.REMOVE_VALUE_PREFIX + Constants.DEFAULT_KEY)) {
getExtensionClasses();
//这边是扩展点自动激活的逻辑,除去values里面配置的扩展点
for (Map.Entry<String, Activate> entry : cachedActivates.entrySet()) {
String name = entry.getKey();
Activate activate = entry.getValue();
if (isMatchGroup(group, activate.group())) {
T ext = getExtension(name);
//除去values里面配置的扩展点,其他的扩展点自动激活
if (! names.contains(name)
&& ! names.contains(Constants.REMOVE_VALUE_PREFIX + name)
//这里来过滤@activate里面配置的value是否在url里面出现
&& isActive(activate, url)) {
exts.add(ext);
}
}
}
Collections.sort(exts, ActivateComparator.COMPARATOR);
}
List<T> usrs = new ArrayList<T>();
//加载value所对应的扩展点实现类
for (int i = 0; i < names.size(); i ++) {
String name = names.get(i);
if (! name.startsWith(Constants.REMOVE_VALUE_PREFIX)
&& ! names.contains(Constants.REMOVE_VALUE_PREFIX + name)) {
if (Constants.DEFAULT_KEY.equals(name)) {
if (usrs.size() > 0) {
exts.addAll(0, usrs);
usrs.clear();
}
} else {
T ext = getExtension(name);
usrs.add(ext);
}
}
}
if (usrs.size() > 0) {
exts.addAll(usrs);
}
return exts;
}
这个方法得到扩展实现分为两部分,第一部分是自动激活的,第二部分是用户指定的。 自动激活也不是无条件的,首先group参数要符合要求,其次的话就是Url参数里面需要有@Activate注解value属性对应的参数,只需要存在一个即可,判断逻辑如下
private boolean isActive(Activate activate, URL url) {
String[] keys = activate.value();
if (keys == null || keys.length == 0) {
return true;
}
for (String key : keys) {
for (Map.Entry<String, String> entry : url.getParameters().entrySet()) {
String k = entry.getKey();
String v = entry.getValue();
if ((k.equals(key) || k.endsWith("." + key))
&& ConfigUtils.isNotEmpty(v)) {
return true;
}
}
}
return false;
}
dubbo中扩展点自动激活最常使用的扩展点为Filter,调用getActivateExtension方法在ProtocolFilterWrapper的buildInvokerChain中
private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
Invoker<T> last = invoker;
List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
if (filters.size() > 0) {
for (int i = filters.size() - 1; i >= 0; i --) {
final Filter filter = filters.get(i);
final Invoker<T> next = last;
last = new Invoker<T>() {
public Class<T> getInterface() {
return invoker.getInterface();
}
public URL getUrl() {
return invoker.getUrl();
}
public boolean isAvailable() {
return invoker.isAvailable();
}
public Result invoke(Invocation invocation) throws RpcException {
return filter.invoke(next, invocation);
}
public void destroy() {
invoker.destroy();
}
@Override
public String toString() {
return invoker.toString();
}
};
}
}
return last;
}
这个方法会构造一个拦截器链,拦截器链的末尾是我们需要调用的方法,即protocol的refer和export方法 这样,通过自定义filter我们实现日志记录,错误结果封装等功能
以上是关于Dubbo之SPI源码分析的主要内容,如果未能解决你的问题,请参考以下文章