Dubbo SPI源码分析
Posted ShowBrilliant
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Dubbo SPI源码分析相关的知识,希望对你有一定的参考价值。
对于一个优秀的框架需要很好的扩展性,给出一个接口,自己可以给出默认实现,同时也允许其他人实现拓展。即“对扩展开放,对修改封闭”的原则。Dubbo 采用微内核+插件的方式来实现,微内核架构中,内核通常采用 Factory、IoC、OSGi 等方式管理插件生命周期,Dubbo 最终决定采用 SPI 机制来加载插件,Dubbo SPI 参考 JDK 原生的 SPI 机制,进行了性能优化以及功能增强。
我们来看看 SPI 定义:
Service Provider Interface (SPI) is an API intended to be implemented or extended by a third party. It can be used to enable framework extension and replaceable components.
JDK SPI
JDK SPI 最比较常见的在访问数据库会使用到java.sql.Driver
这个接口,不同的数据库产商会有不同的实现,JDK SPI 机制可以为某个接口寻找服务实现。
JDK SPI 机制
我们先看一个例子,模拟连接数据库,先定义一个 Driver 接口。
package com.cuzz.api;
public interface Driver {
void connect(String url);
}
然后不同的产商有不同的实现,以 mysql 和 oracle 两个实现。
package com.cuzz.mysql;
import com.cuzz.api.Driver;
public class MysqlDriver implements Driver {
@Override
public void connect(String url) {
System.out.println("connect mysql: " + url);
}
}
// -----------------
package com.cuzz.oracle;
import com.cuzz.api.Driver;
public class OracleDriver implements Driver {
@Override
public void connect(String url) {
System.out.println("connect oracle: " + url);
}
}
在项目的 resources/META-INF/services 目录下添加一个名为 com.cuzz.api.Driver 的文件,这是 JDK SPI 需要读取的配置文件,具体内容如下:
com.cuzz.mysql.MysqlDriver
com.cuzz.oracle.OracleDriver
加载配置:
public class Main {
public static void main(String[] args) {
// Java spi 机制
ServiceLoader<Driver> serviceLoader = ServiceLoader.load(Driver.class);
System.out.println(serviceLoader);
Iterator<Driver> iterator = serviceLoader.iterator();
while (iterator.hasNext()) {
Driver driver = iterator.next();
driver.connect("localhost:3306");
}
}
}
运行结果:
java.util.ServiceLoader[com.cuzz.api.Driver]
connect mysql: localhost:3306
connect oracle: localhost:3306
JDK SPI 源码分析
我们从ServiceLoader<Driver> serviceLoader = ServiceLoader.load(Driver.class);
定位到 ServiceLoader 构造方法中的java.util.ServiceLoader#reload
方法
// 前缀
private static final String PREFIX = "META-INF/services/";
// The class or interface representing the service being loaded
private final Class<S> service;
// The class loader used to locate, load, and instantiate providers
private final ClassLoader loader;
// The access control context taken when the ServiceLoader is created
private final AccessControlContext acc;
// Cached providers, in instantiation order
// 缓存
private LinkedHashMap<String,S> providers = new LinkedHashMap<>();
// The current lazy-lookup iterator
// 懒加载迭代器
private LazyIterator lookupIterator;
public void reload() {
providers.clear();
lookupIterator = new LazyIterator(service, loader);
}
重点看看这个 LazyIterator 类,这是一个内部类,主要以懒加载形式实现。Iterator 这个接口需要实现 Iterator#hasNext 方法和 Iterator#next 方法,hasNext 方法调用了 LazyIterator#hasNextService,而 next 方法调用 LazyIterator#nextService。
private class LazyIterator implements Iterator<S> {
Class<S> service;
ClassLoader loader;
// 像这样的URL file:/Users/cuzz/Projects/Java/dubbo/cuzz-demo/cuzz-demo-spi/target/classes/META-INF/services/com.cuzz.api.Driver
Enumeration<URL> configs = null;
Iterator<String> pending = null;
String nextName = null;
private LazyIterator(Class<S> service, ClassLoader loader) {
this.service = service;
this.loader = loader;
}
private boolean hasNextService() {
if (nextName != null) {
return true;
}
// 第一次获取,config 为空开始加载文件
if (configs == null) {
try {
// 获取文件名 META-INF/services/com.cuzz.api.Driver
String fullName = PREFIX + service.getName();
// 加载配置路径
if (loader == null)
configs = ClassLoader.getSystemResources(fullName);
else
configs = loader.getResources(fullName);
} catch (IOException x) {
fail(service, "Error locating configuration files", x);
}
}
while ((pending == null) || !pending.hasNext()) {
if (!configs.hasMoreElements()) {
return false;
}
// 解析文件
pending = parse(service, configs.nextElement());
}
// 把实现类的名称记录下来 com.cuzz.mysql.MysqlDriver
nextName = pending.next();
return true;
}
private S nextService() {
if (!hasNextService())
throw new NoSuchElementException();
// 存一个备份
String cn = nextName;
nextName = null;
Class<?> c = null;
try {
// 通过反射获取该实现类
c = Class.forName(cn, false, loader);
} catch (ClassNotFoundException x) {
fail(service,
"Provider " + cn + " not found");
}
if (!service.isAssignableFrom(c)) {
fail(service,
"Provider " + cn + " not a subtype");
}
try {
S p = service.cast(c.newInstance());
providers.put(cn, p);
return p;
} catch (Throwable x) {
fail(service,
"Provider " + cn + " could not be instantiated",
x);
}
throw new Error(); // This cannot happen
}
public boolean hasNext() {
if (acc == null) {
return hasNextService();
} else {
PrivilegedAction<Boolean> action = new PrivilegedAction<Boolean>() {
public Boolean run() { return hasNextService(); }
};
return AccessController.doPrivileged(action, acc);
}
}
public S next() {
if (acc == null) {
return nextService();
} else {
PrivilegedAction<S> action = new PrivilegedAction<S>() {
public S run() { return nextService(); }
};
return AccessController.doPrivileged(action, acc);
}
}
public void remove() {
throw new UnsupportedOperationException();
}
}
最后我们来 ServiceLoader#iterator 这个方法是怎么实现的,主要是先走缓存,在走懒加载。
public Iterator<S> iterator() {
return new Iterator<S>() {
Iterator<Map.Entry<String,S>> knownProviders
= providers.entrySet().iterator();
public boolean hasNext() {
// 先走缓存,在走懒加载
if (knownProviders.hasNext())
return true;
return lookupIterator.hasNext();
}
public S next() {
// 先走缓存,在走懒加载
if (knownProviders.hasNext())
return knownProviders.next().getValue();
return lookupIterator.next();
}
public void remove() {
throw new UnsupportedOperationException();
}
};
}
JDK SPI 在 JDBC 中的应用
当我们引入 mysql 驱动时候,在 META-INF/services 目录下,有一个 java.sql.Driver 文件,内容如下。
om.mysql.jdbc.Driver
com.mysql.fabric.jdbc.FabricMySQLDriver
当我们要链接 JDBC 会通过 DriverManager 驱动管理来连接。
String url = "jdbc:mysql://localhost:3306/demo?useSSL=true&useUnicode=true&characterEncoding=UTF-8";
String username = "root";
String pwd = "12345";
Connection conn = DriverManager.getConnection(url, username, pwd);
DriverManager 类的静态方法在 JVM 加载类的时候会执行,执行 loadInitialDrivers 方法。
private final static CopyOnWriteArrayList<DriverInfo> registeredDrivers = new CopyOnWriteArrayList<>();
static {
loadInitialDrivers();
println("JDBC DriverManager initialized");
}
private static void loadInitialDrivers() {
// 看看系统属性是否配置了jdbc.drivers
String drivers;
try {
drivers = AccessController.doPrivileged(new PrivilegedAction<String>() {
public String run() {
return System.getProperty("jdbc.drivers");
}
});
} catch (Exception ex) {
drivers = null;
}
// If the driver is packaged as a Service Provider, load it.
// Get all the drivers through the classloader
// exposed as a java.sql.Driver.class service.
// ServiceLoader.load() replaces the sun.misc.Providers()
AccessController.doPrivileged(new PrivilegedAction<Void>() {
public Void run() {
// JDK SPI 方式加载并实例化
ServiceLoader<Driver> loadedDrivers = ServiceLoader.load(Driver.class);
Iterator<Driver> driversIterator = loadedDrivers.iterator();
/* Load these drivers, so that they can be instantiated.
* It may be the case that the driver class may not be there
* i.e. there may be a packaged driver with the service class
* as implementation of java.sql.Driver but the actual class
* may be missing. In that case a java.util.ServiceConfigurationError
* will be thrown at runtime by the VM trying to locate
* and load the service.
*
* Adding a try catch block to catch those runtime errors
* if driver not available in classpath but it's
* packaged as service and that service is there in classpath.
*/
try{
while(driversIterator.hasNext()) {
driversIterator.next();
}
} catch(Throwable t) {
// Do nothing
}
return null;
}
});
println("DriverManager.initialize: jdbc.drivers = " + drivers);
if (drivers == null || drivers.equals("")) {
return;
}
// 配置了jdbc.dirvers属性通过反射实例化
String[] driversList = drivers.split(":");
println("number of Drivers:" + driversList.length);
for (String aDriver : driversList) {
try {
println("DriverManager.Initialize: loading " + aDriver);
Class.forName(aDriver, true,
ClassLoader.getSystemClassLoader());
} catch (Exception ex) {
println("DriverManager.Initialize: load failed: " + ex);
}
}
}
实例化 java.sql.Driver 接口实现类,在 MySQL 提供的,会吧自己注册到 DriverManager 中。
package com.mysql.jdbc;
import java.sql.SQLException;
public class Driver extends NonRegisteringDriver implements java.sql.Driver {
// Register ourselves with the DriverManager
static {
try {
// 注册到DriverManager的CopyOnWriteArrayList中
java.sql.DriverManager.registerDriver(new Driver());
} catch (SQLException E) {
throw new RuntimeException("Can't register driver!");
}
}
}
最后调用 DriverManager#getConnection 从注册中获取连接。
// Worker method called by the public getConnection() methods.
private static Connection getConnection(
String url, java.util.Properties info, Class<?> caller) throws SQLException {
// 循环从注册中获取,获取到一个就返回。
for(DriverInfo aDriver : registeredDrivers) {
try {
Connection con = aDriver.driver.connect(url, info);
if (con != null) {
// Success!
println("getConnection returning " + aDriver.driver.getClass().getName());
return (con);
}
} catch (SQLException ex) {
if (reason == null) {
reason = ex;
}
}
}
}
JDK SPI 的缺点
-
虽然 ServiceLoader 也算是使用的延迟加载,但是基本只能通过遍历全部获取,也就是接口的实现类全部加载并实例化一遍。如果你并不想用某些实现类,它也被加载并实例化了,这就造成了浪费。 -
获取某个实现类的方式不够灵活,只能通过 Iterator 形式获取,不能根据某个参数来获取对应的实现类。
Dubbo SPI
Dubbo SPI 对 JDK SPI 进行了扩展,由原来的提供者类的全限定名列表改成了 K-V 形式,如果 SPI 配置文件中定义了多个实现类,而我们只需要使用其中一个实现类时,就会生成不必要的对象,除此之外 Dubbo 对 JDK SPI 做了三个方面的扩展:
-
方便获取扩展实现:JDK SPI 仅仅通过接口类名获取所有实现,而 ExtensionLoader 则通过接口类名和 key 值获取一个实现。
-
IOC 依赖注入功能:Adaptive 实现,就是生成一个代理类,这样就可以根据实际调用时的一些参数动态决定要调用的类了。
-
采用装饰器模式进行功能增强,自动包装实现,这种实现的类一般是自动激活的,常用于包装类,比如:Protocol 的两个实现类:ProtocolFilterWrapper、ProtocolListenerWrapper。
Dubbo 按照 SPI 配置文件的用途,将其分成了三类目录。
-
META-INF/services/ 目录:该目录下的 SPI 配置文件用来兼容 JDK SPI 。
-
META-INF/dubbo/ 目录:该目录用于存放用户自定义 SPI 配置文件。
-
META-INF/dubbo/internal/ 目录:该目录用于存放 Dubbo 内部使用的 SPI 配置文件。
Dubbo SPI 机制
定义一个接口,用 @SPI 标识表示是 Dubbo SPI。
@SPI
public interface Driver {
void connect(String url);
}
实现类:
package com.cuzz.mysql;
import com.cuzz.api.Driver;
public class MysqlDriver implements Driver {
@Override
public void connect(String url) {
System.out.println("connect mysql: " + url);
}
}
// -----------------
package com.cuzz.oracle;
import com.cuzz.api.Driver;
public class OracleDriver implements Driver {
@Override
public void connect(String url) {
System.out.println("connect oracle: " + url);
}
}
在项目的 resources/META-INF/dubbo 目录下添加一个名为 com.cuzz.api.Driver 的文件,这是 Dubbo SPI 需要读取的配置文件,与 JDK SPI 不一样是 KV 形式,具体内容如下:
mysqlDriver=com.cuzz.mysql.MysqlDriver
oracleDriver=com.cuzz.oracle.OracleDriver
获取实现类:
public class App {
public static void main(String[] args) {
Driver driver = ExtensionLoader.getExtensionLoader(Driver.class).getExtension("mysqlDriver");
driver.connect("localhost:3306");
}
}
输出:
connect mysql: localhost:3306
Dubbo SPI 主流程
我们先从获取 ExtensLoader 实例开始,ExtensionLoader#getExtensionLoader
/**
* Dubbo 中一个扩展接口对应一个 ExtensionLoader 实例,该集合缓存了全部 ExtensionLoader 实例,
* 其中的 Key 为扩展接口,Value 为加载其扩展实现的 ExtensionLoader 实例。
*/
private static final ConcurrentMap<Class<?>, ExtensionLoader<?>> EXTENSION_LOADERS = new ConcurrentHashMap<>(64);
public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> type) {
if (type == null) {
throw new IllegalArgumentException("Extension type == null");
}
// 必须为接口
if (!type.isInterface()) {
throw new IllegalArgumentException("Extension type (" + type + ") is not an interface!");
}
// 必须有@SPI接口
if (!withExtensionAnnotation(type)) {
throw new IllegalArgumentException("Extension type (" + type +
") is not an extension, because it is NOT annotated with @" + SPI.class.getSimpleName() + "!");
}
// 从缓存中获取
ExtensionLoader<T> loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
if (loader == null) {
// 如果已经存在 key 就不往 map 中添加
EXTENSION_LOADERS.putIfAbsent(type, new ExtensionLoader<T>(type)); // --->
loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
}
return loader;
}
接着看看 ExtensionLoader#ExtensionLoader 构造方法,如果 type 不为 ExtensionFactory.class 初始化拓展适配器。
/**
* 表示拓展类实例工厂,可以通过工厂创建实例
*/
private final ExtensionFactory objectFactory;
private ExtensionLoader(Class<?> type) {
this.type = type;
// 初始化拓展适配器
objectFactory = (type == ExtensionFactory.class ? null : ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension());
}
获取拓展实现类,ExtensionLoader#getExtension
/**
* 缓存了该 ExtensionLoader 加载的扩展名与扩展实现对象之间的映射关系。
*/
private final ConcurrentMap<String, Holder<Object>> cachedInstances = new ConcurrentHashMap<>();
public T getExtension(String name) {
if (StringUtils.isEmpty(name)) {
throw new IllegalArgumentException("Extension name == null");
}
// @SPI中value有值,如@SPI("dubbo") 默认获取 key 为 dubbo 的 Extension
if ("true".equals(name)) {
return getDefaultExtension();
}
// getOrCreateHolder()方法中封装了查找cachedInstances缓存的逻辑
final Holder<Object> holder = getOrCreateHolder(name);
Object instance = holder.get();
if (instance == null) {
synchronized (holder) { // 双重锁防止并发
instance = holder.get();
if (instance == null) {
instance = createExtension(name); // --->
holder.set(instance);
}
}
}
return (T) instance;
}
private Holder<Object> getOrCreateHolder(String name) {
Holder<Object> holder = cachedInstances.get(name);
if (holder == null) {
cachedInstances.putIfAbsent(name, new Holder<>());
holder = cachedInstances.get(name);
}
return holder;
}
ExtensionLoader#createExtension 方法中完成了 SPI 配置文件的查找以及相应扩展实现类的实例化,同时还实现了自动装配以及自动 Wrapper 包装等功能。
private T createExtension(String name) {
// 获取 cachedClasses 缓存,根据扩展名从 cachedClasses 缓存中获取扩展实现类。
Class<?> clazz = getExtensionClasses().get(name); // ---> 1
if (clazz == null) {
throw findException(name);
}
try {
// 根据扩展实现类从 EXTENSION_INSTANCES 缓存中查找相应的实例。如果查找失败,会通过反射创建扩展实现对象。
T instance = (T) EXTENSION_INSTANCES.get(clazz);
if (instance == null) {
EXTENSION_INSTANCES.putIfAbsent(clazz, clazz.newInstance());
instance = (T) EXTENSION_INSTANCES.get(clazz);
}
// 自动装配扩展实现对象中的属性(即调用其 setter)。这里涉及 ExtensionFactory 以及自动装配的相关内容。
injectExtension(instance);
// 自动包装扩展实现对象。这里涉及 Wrapper 类以及自动包装特性的相关内容.
Set<Class<?>> wrapperClasses = cachedWrapperClasses;
if (CollectionUtils.isNotEmpty(wrapperClasses)) {
for (Class<?> wrapperClass : wrapperClasses) {
instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
}
}
// 如果扩展实现类实现了 Lifecycle 接口,在 initExtension() 方法中会调用 initialize() 方法进行初始化。
initExtension(instance);
return instance;
} catch (Throwable t) {
throw new IllegalStateException("Extension instance (name: " + name + ", class: " +
type + ") couldn't be instantiated: " + t.getMessage(), t);
}
}
Dubbo SPI 获取拓展类
ExtensionLoader#getExtensionClasses
/**
* 缓存了该 ExtensionLoader 加载的扩展名与扩展实现类之间的映射关系。cachedNames 集合的反向关系缓存。
*/
private final Holder<Map<String, Class<?>>> cachedClasses = new Holder<>();
private Map<String, Class<?>> getExtensionClasses() {
// 先从缓存中获取
Map<String, Class<?>> classes = cachedClasses.get();
if (classes == null) {
synchronized (cachedClasses) {
classes = cachedClasses.get();
if (classes == null) {
// 加载类
classes = loadExtensionClasses(); // --->
cachedClasses.set(classes);
}
}
}
return classes;
}
ExtensionLoader#loadExtensionClasses
/**
* synchronized in getExtensionClasses
*/
private Map<String, Class<?>> loadExtensionClasses() {
// 只能有一个默认值
cacheDefaultExtensionName();
// 加载的扩展名与扩展实现类之间的映射关系
Map<String, Class<?>> extensionClasses = new HashMap<>();
for (LoadingStrategy strategy : strategies) {
loadDirectory(extensionClasses, strategy.directory(), type.getName(), strategy.preferExtensionClassLoader(), strategy.overridden(), strategy.excludedPackages()); // --->
loadDirectory(extensionClasses, strategy.directory(), type.getName().replace("org.apache", "com.alibaba"), strategy.preferExtensionClassLoader(), strategy.overridden(), strategy.excludedPackages());
}
return extensionClasses;
}
private void cacheDefaultExtensionName() {
final SPI defaultAnnotation = type.getAnnotation(SPI.class);
if (defaultAnnotation == null) {
return;
}
String value = defaultAnnotation.value();
// 只能有一个车默认值,这种 @SPI("dubbo,http") 就会报错
if ((value = value.trim()).length() > 0) {
String[] names = NAME_SEPARATOR.split(value);
if (names.length > 1) {
throw new IllegalStateException("More than 1 default extension name on extension " + type.getName()
+ ": " + Arrays.toString(names));
}
if (names.length == 1) {
cachedDefaultName = names[0];
}
}
}
ExtensionLoader#loadDirectory
private void loadDirectory(Map<String, Class<?>> extensionClasses, String dir, String type,
boolean extensionLoaderClassLoaderFirst, boolean overridden, String... excludedPackages) {
String fileName = dir + type;
try {
Enumeration<java.net.URL> urls = null;
ClassLoader classLoader = findClassLoader();
// try to load from ExtensionLoader's ClassLoader first
if (extensionLoaderClassLoaderFirst) {
ClassLoader extensionLoaderClassLoader = ExtensionLoader.class.getClassLoader();
if (ClassLoader.getSystemClassLoader() != extensionLoaderClassLoader) {
urls = extensionLoaderClassLoader.getResources(fileName);
}
}
if (urls == null || !urls.hasMoreElements()) {
if (classLoader != null) {
urls = classLoader.getResources(fileName);
} else {
urls = ClassLoader.getSystemResources(fileName);
}
}
// 循环获取
if (urls != null) {
while (urls.hasMoreElements()) {
java.net.URL resourceURL = urls.nextElement();
loadResource(extensionClasses, classLoader, resourceURL, overridden, excludedPackages); // --->
}
}
} catch (Throwable t) {
logger.error("Exception occurred when loading extension class (interface: " +
type + ", description file: " + fileName + ").", t);
}
}
ExtensionLoader#loadResource
private void loadResource(Map<String, Class<?>> extensionClasses, ClassLoader classLoader,
java.net.URL resourceURL, boolean overridden, String... excludedPackages) {
try {
// 必须 utf-8 格式
try (BufferedReader reader = new BufferedReader(new InputStreamReader(resourceURL.openStream(), StandardCharsets.UTF_8))) {
String line;
while ((line = reader.readLine()) != null) {
final int ci = line.indexOf('#');
if (ci >= 0) {
// 去掉注释
line = line.substring(0, ci);
}
line = line.trim();
if (line.length() > 0) {
try {
String name = null;
int i = line.indexOf('=');
if (i > 0) {
name = line.substring(0, i).trim();
line = line.substring(i + 1).trim();
}
// 没有被排除外
if (line.length() > 0 && !isExcluded(line, excludedPackages)) {
loadClass(extensionClasses, resourceURL, Class.forName(line, true, classLoader), name, overridden); // --->
}
} catch (Throwable t) {
IllegalStateException e = new IllegalStateException("Failed to load extension class (interface: " + type + ", class line: " + line + ") in " + resourceURL + ", cause: " + t.getMessage(), t);
exceptions.put(line, e);
}
}
}
}
} catch (Throwable t) {
logger.error("Exception occurred when loading extension class (interface: " +
type + ", class file: " + resourceURL + ") in " + resourceURL, t);
}
}
ExtensionLoader#loadClass
private void loadClass(Map<String, Class<?>> extensionClasses, java.net.URL resourceURL, Class<?> clazz, String name,
boolean overridden) throws NoSuchMethodException {
if (!type.isAssignableFrom(clazz)) {
throw new IllegalStateException("Error occurred when loading extension class (interface: " +
type + ", class line: " + clazz.getName() + "), class "
+ clazz.getName() + " is not subtype of interface.");
}
// 处理Adaptive注解,若存在则将该实现类保存至cachedAdaptiveClass属性
if (clazz.isAnnotationPresent(Adaptive.class)) {
cacheAdaptiveClass(clazz, overridden);
}
// 是否为包装类,是包装类缓存到 cachedWrapperClasses Set中
else if (isWrapperClass(clazz)) {
cacheWrapperClass(clazz);
} else {
clazz.getConstructor();
if (StringUtils.isEmpty(name)) {
name = findAnnotationName(clazz);
if (name.length() == 0) {
throw new IllegalStateException("No such extension name for the class " + clazz.getName() + " in the config " + resourceURL);
}
}
// key可以为多个,如:mysqlDriver,mysqlDriver2=com.cuzz.mysql.MysqlDriver
String[] names = NAME_SEPARATOR.split(name);
if (ArrayUtils.isNotEmpty(names)) {
// 缓存到 cachedActivates 属性中
cacheActivateClass(clazz, names[0]);
for (String n : names) {
// 缓存了该 ExtensionLoader 加载的扩展实现类与扩展名之间的映射关系。
cacheName(clazz, n);
// 加载的扩展名与扩展实现类之间的映射关系
saveInExtensionClass(extensionClasses, clazz, n, overridden);
}
}
}
}
private void cacheAdaptiveClass(Class<?> clazz, boolean overridden) {
if (cachedAdaptiveClass == null || overridden) {
cachedAdaptiveClass = clazz;
} else if (!cachedAdaptiveClass.equals(clazz)) {
throw new IllegalStateException("More than 1 adaptive class found: "
+ cachedAdaptiveClass.getName()
+ ", " + clazz.getName());
}
}
Dubbo SPI 的自动包装和自动注入
回到前面我们分析 ExtensionLoader#createExtension 方法,现在我们重点关注 ExtensionLoader#injectExtension 方法
private T createExtension(String name) {
// 获取 cachedClasses 缓存,根据扩展名从 cachedClasses 缓存中获取扩展实现类。
Class<?> clazz = getExtensionClasses().get(name);
if (clazz == null) {
throw findException(name);
}
try {
// 根据扩展实现类从 EXTENSION_INSTANCES 缓存中查找相应的实例。如果查找失败,会通过反射创建扩展实现对象。
T instance = (T) EXTENSION_INSTANCES.get(clazz);
if (instance == null) {
EXTENSION_INSTANCES.putIfAbsent(clazz, clazz.newInstance());
instance = (T) EXTENSION_INSTANCES.get(clazz);
}
// 自动装配扩展实现对象中的属性(即调用其 setter)。这里涉及 ExtensionFactory 以及自动装配的相关内容。
injectExtension(instance); // --->
// 自动包装扩展实现对象。这里涉及 Wrapper 类以及自动包装特性的相关内容。
Set<Class<?>> wrapperClasses = cachedWrapperClasses;
if (CollectionUtils.isNotEmpty(wrapperClasses)) {
// 遍历所有的包装类,包装类需要有一个参数类被包装类型的构造器。
for (Class<?> wrapperClass : wrapperClasses) {
instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
}
}
// 如果扩展实现类实现了 Lifecycle 接口,在 initExtension() 方法中会调用 initialize() 方法进行初始化。
initExtension(instance);
return instance;
} catch (Throwable t) {
throw new IllegalStateException("Extension instance (name: " + name + ", class: " +
type + ") couldn't be instantiated: " + t.getMessage(), t);
}
}
ExtensionLoader#injectExtension
private T injectExtension(T instance) {
if (objectFactory == null) {
return instance;
}
try {
for (Method method : instance.getClass().getMethods()) {
// 判断是否为set方法
if (!isSetter(method)) {
continue;
}
// 如果有 @DisableInject 注解也不注入
if (method.getAnnotation(DisableInject.class) != null) {
continue;
}
// 获取参数类型,如果是基本类型也忽略
Class<?> pt = method.getParameterTypes()[0];
if (ReflectUtils.isPrimitives(pt)) {
continue;
}
try {
// 根据 Setter 方法获取属性名
String property = getSetterProperty(method);
// 加载这个类,并实例化
Object object = objectFactory.getExtension(pt, property);
if (object != null) {
// 反射注入
method.invoke(instance, object);
}
} catch (Exception e) {
logger.error("Failed to inject via method " + method.getName()
+ " of interface " + type.getName() + ": " + e.getMessage(), e);
}
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
return instance;
}
Dubbo SPI 的 @Adaptive 注解与适配器
在 dubbo 扩展中,适配器模式被广泛使用,其作用在于为同一扩展类型下的多个扩展实现的调用提供路由功能,如指定优先级等。dubbo 提供了两种方式来生成扩展适配器:
-
静态代码形式的默认适配器:这些类会被 Adaptive 注解修饰,且一个接口只能有一个这样的静态适配器。这种形式仅应用于一些特殊的接口,如:AdaptiveCompiler、AdaptiveExtensionFactory 这两个适配器,ExtensionLoader 需要依赖它们来工作,所以使用了这种特殊的构建方式。
-
动态代码适配器:实际上其余的接口都是使用动态适配器,ExtensionLoader 根据接口定义动态生成一段适配器代码,并构建这个动态类的实例。这个时候接口中的一些方法具有 Adaptive 标记,它提供了一些用于查找具体 Extension 的 key,如果这些方法中有 URL 类型的参数,则会依次在 url 中查找这些 key 对应的 value,再以此为 name 确定要使用的 Extension。如果没有从 url 中找到该参数,则会使用 SPI 注解中的默认值 name 进行构建。
我们回到构造方法中 ExtensionLoader#getAdaptiveExtension
private ExtensionLoader(Class<?> type) {
this.type = type;
// 初始化拓展适配器
objectFactory = (type == ExtensionFactory.class ? null : ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension());
}
ExtensionLoader#getAdaptiveExtension
public T getAdaptiveExtension() {
// 先从缓存中获取
Object instance = cachedAdaptiveInstance.get();
if (instance == null) {
if (createAdaptiveInstanceError != null) {
throw new IllegalStateException("Failed to create adaptive instance: " +
createAdaptiveInstanceError.toString(),
createAdaptiveInstanceError);
}
synchronized (cachedAdaptiveInstance) {
instance = cachedAdaptiveInstance.get();
if (instance == null) {
try {
// 创建
instance = createAdaptiveExtension(); // ---> 1
cachedAdaptiveInstance.set(instance);
} catch (Throwable t) {
createAdaptiveInstanceError = t;
throw new IllegalStateException("Failed to create adaptive instance: " + t.toString(), t);
}
}
}
}
return (T) instance;
}
private T createAdaptiveExtension() {
try {
// 注入属性
return injectExtension((T) getAdaptiveExtensionClass().newInstance()); // ---> 2
} catch (Exception e) {
throw new IllegalStateException("Can't create adaptive extension " + type + ", cause: " + e.getMessage(), e);
}
}
private Class<?> getAdaptiveExtensionClass() {
getExtensionClasses();
if (cachedAdaptiveClass != null) {
return cachedAdaptiveClass;
}
return cachedAdaptiveClass = createAdaptiveExtensionClass(); // ---> 3
}
private Class<?> createAdaptiveExtensionClass() {
// 创建适配器类,并继承 type 接口
String code = new AdaptiveClassCodeGenerator(type, cachedDefaultName).generate(); // ---> 4
ClassLoader classLoader = findClassLoader();
// ExtensionLoader再调用默认的JavassitCompiler进行编译和类加载
org.apache.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(org.apache.dubbo.common.compiler.Compiler.class).getAdaptiveExtension();
return compiler.compile(code, classLoader);
}
ExtensionLoader#createAdaptiveExtensionClass
以 Transsporter 为例子
@SPI("netty")
public interface Transporter {
@Adaptive({Constants.SERVER_KEY, Constants.TRANSPORTER_KEY})
RemotingServer bind(URL url, ChannelHandler handler) throws RemotingException;
@Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY})
Client connect(URL url, ChannelHandler handler) throws RemotingException;
}
Dubbo 会生成一个 Transporter$Adaptive 适配器类,该类继承了 Transporter 接口:
public class Transporter$Adaptive implements Transporter {
public org.apache.dubbo.remoting.Client connect(URL arg0, ChannelHandler arg1) throws RemotingException {
// 必须传递URL参数
if (arg0 == null) throw new IllegalArgumentException("url == null");
URL url = arg0;
// 确定扩展名,优先从URL中的client参数获取,其次是transporter参数
// 这两个参数名称由@Adaptive注解指定,最后是@SPI注解中的默认值
String extName = url.getParameter("client",
url.getParameter("transporter", "netty"));
if (extName == null)
throw new IllegalStateException("...");
// 通过ExtensionLoader加载Transporter接口的指定扩展实现
Transporter extension = (Transporter) ExtensionLoader
.getExtensionLoader(Transporter.class)
.getExtension(extName);
return extension.connect(arg0, arg1);
}
... // 省略bind()方法
}
Dubbo SPI 的 @Activate 注解与自动激活特性
这里以 Dubbo 中的 Filter 为例说明自动激活特性的含义,org.apache.dubbo.rpc.Filter 接口有非常多的扩展实现类,在一个场景中可能需要某几个 Filter 扩展实现类协同工作,而另一个场景中可能需要另外几个实现类一起工作。这样,就需要一套配置来指定当前场景中哪些 Filter 实现是可用的,这就是 @Activate 注解要做的事情。
@Activate 注解标注在扩展实现类上,有 group、value 以及 order 三个属性。
-
group 属性:修饰的实现类是在 Provider 端被激活还是在 Consumer 端被激活。 -
value 属性:修饰的实现类只在 URL 参数中出现指定的 key 时才会被激活。 -
order 属性:用来确定扩展实现类的排序。
如 Filter 接口和实现类:
@SPI
public interface Filter {
Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException;1
}
@Activate(group = Constants.PROVIDER)
public class TimeoutFilter implements Filter {
...
}
@Activate(group = {Constants.PROVIDER, Constants.CONSUMER})
public class MonitorFilter implements Filter {
...
}
首先来关注 getActivateExtension() 方法的参数:url 中包含了配置信息,values 是配置中指定的扩展名,group 为 Provider 或 Consumer。
public List<T> getActivateExtension(URL url, String[] values,
String group) {
List<T> activateExtensions = new ArrayList<>();
// values配置就是扩展名
List<String> names = values == null ?
new ArrayList<>(0) : asList(values);
if (!names.contains(REMOVE_VALUE_PREFIX + DEFAULT_KEY)) {// ---1
getExtensionClasses(); // 触发cachedActivates等缓存字段的加载
for (Map.Entry<String, Object> entry :
cachedActivates.entrySet()) {
String name = entry.getKey(); // 扩展名
Object activate = entry.getValue(); // @Activate注解
String[] activateGroup, activateValue;
if (activate instanceof Activate) { // @Activate注解中的配置
activateGroup = ((Activate) activate).group();
activateValue = ((Activate) activate).value();
} else {
continue;
}
if (isMatchGroup(group, activateGroup) // 匹配group
// 没有出现在values配置中的,即为默认激活的扩展实现
&& !names.contains(name)
// 通过"-"明确指定不激活该扩展实现
&& !names.contains(REMOVE_VALUE_PREFIX + name)
// 检测URL中是否出现了指定的Key
&& isActive(activateValue, url)) {
// 加载扩展实现的实例对象,这些都是激活的
activateExtensions.add(getExtension(name));
}
}
// 排序 --- 2
activateExtensions.sort(ActivateComparator.COMPARATOR);
}
List<T> loadedExtensions = new ArrayList<>();
for (int i = 0; i < names.size(); i++) { // ---3
String name = names.get(i);
// 通过"-"开头的配置明确指定不激活的扩展实现,直接就忽略了
if (!name.startsWith(REMOVE_VALUE_PREFIX)
&& !names.contains(REMOVE_VALUE_PREFIX + name)) {
if (DEFAULT_KEY.equals(name)) {
if (!loadedExtensions.isEmpty()) {
// 按照顺序,将自定义的扩展添加到默认扩展集合前面
activateExtensions.addAll(0, loadedExtensions);
loadedExtensions.clear();
}
} else {
loadedExtensions.add(getExtension(name));
}
}
}
if (!loadedExtensions.isEmpty()) {
// 按照顺序,将自定义的扩展添加到默认扩展集合后面
activateExtensions.addAll(loadedExtensions);
}
return activateExtensions;
}
总结
本文总结了 JDK SPI 和 Dubbo SPI 机制和原理,参考了很多文章,以下几点需要值得注意:
-
JDK SPI 需要对加载实例化所有的推展对象,而 Dubbo SPI 根据 KV 形式,只需要实例化需要的拓展。 -
Dubbo SPI 对 JDK SPI 拓展了自动注入、自动注入以及自动激活等特性。
参考
-
Dubbo 官网-Dubbo SPI [1] -
Dubbo SPI 精析 [2] -
Dubbo 源码解读全集 [3] -
聊聊 Dubbo(五):核心源码-SPI 扩展 [4] -
Dubbo 源码分析(五)ExtensionLoader [5]
参考资料
Dubbo 官网-Dubbo SPI: http://dubbo.apache.org/zh-cn/docs/source_code_guide/dubbo-spi.html
[2]Dubbo SPI 精析: https://kaiwu.lagou.com/course/courseInfo.htm?courseId=393#/detail/pc?id=4259
[3]Dubbo 源码解读全集: https://www.bilibili.com/video/BV1Wt4y1D72z
[4]聊聊 Dubbo(五):核心源码-SPI 扩展: https://www.jianshu.com/p/7daa38fc9711
[5]Dubbo 源码分析(五)ExtensionLoader: http://gaofeihang.cn/archives/278
以上是关于Dubbo SPI源码分析的主要内容,如果未能解决你的问题,请参考以下文章