源码分析---SOFARPC可扩展的机制SPI
Posted luozhiyun
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了源码分析---SOFARPC可扩展的机制SPI相关的知识,希望对你有一定的参考价值。
这几天离职在家,正好没事可以疯狂的输出一下,本来想写DUBBO的源码解析的,但是发现写DUBBO源码的太多了,所以找一个写的不那么多的框架,所以就选中SOFARPC这个框架了。
SOFARPC是蚂蚁金服开源的一个RPC框架,相比DUBBO它没有这么多历史的包袱,代码更加简洁,设计思路更加清晰,更加容易去理解其中的代码。
那么为什么要去重写原生的SPI呢?官方给出了如下解释:
- 按需加载
- 可以有别名
- 可以有优先级进行排序和覆盖
- 可以控制是否单例
- 可以在某些场景下使用编码
- 可以指定扩展配置位置
- 可以排斥其他扩展点
整个流程如下:
我们以ConsumerBootstrap为例:
先要有一个抽象类:
@Extensible(singleton = false)
public abstract class ConsumerBootstrap<T>
....
指定扩展实现类:
@Extension("sofa")
public class DefaultConsumerBootstrap<T> extends ConsumerBootstrap<T>
...
扩展描述文件META-INF/services/sofa-rpc/com.alipay.sofa.rpc.bootstrap.ConsumerBootstrap
sofa=com.alipay.sofa.rpc.bootstrap.DefaultConsumerBootstrap
当这些准备完成后,直接调用即可。
ConsumerBootstrap sofa = ExtensionLoaderFactory.getExtensionLoader(ConsumerBootstrap.class).getExtension("sofa");
接下来我们看看ExtensionLoaderFactory的源码
/**
* All extension loader Class : ExtensionLoader
* 这个map里面装的是所有ExtensionLoader
*/
private static final ConcurrentMap<Class, ExtensionLoader> LOADER_MAP = new ConcurrentHashMap<Class, ExtensionLoader>();
public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> clazz, ExtensionLoaderListener<T> listener)
ExtensionLoader<T> loader = LOADER_MAP.get(clazz);
if (loader == null)
//get不到则加上锁
synchronized (ExtensionLoaderFactory.class)
//防止其他线程操作再get一次
loader = LOADER_MAP.get(clazz);
if (loader == null)
loader = new ExtensionLoader<T>(clazz, listener);
LOADER_MAP.put(clazz, loader);
return loader;
然后我们看一下ExtensionLoader这个类的构造器
protected ExtensionLoader(Class<T> interfaceClass, boolean autoLoad, ExtensionLoaderListener<T> listener)
//如果正在执行关闭,则将属性置空后直接返回
if (RpcRunningState.isShuttingDown())
this.interfaceClass = null;
this.interfaceName = null;
this.listener = null;
this.factory = null;
this.extensible = null;
this.all = null;
return;
// 接口为空,既不是接口,也不是抽象类
if (interfaceClass == null ||
!(interfaceClass.isInterface() || Modifier.isAbstract(interfaceClass.getModifiers())))
throw new IllegalArgumentException("Extensible class must be interface or abstract class!");
//当前加载的接口类名
this.interfaceClass = interfaceClass;
//接口名字
this.interfaceName = ClassTypeUtils.getTypeStr(interfaceClass);
this.listener = listener;
//接口上必须要有Extensible注解
Extensible extensible = interfaceClass.getAnnotation(Extensible.class);
if (extensible == null)
throw new IllegalArgumentException(
"Error when load extensible interface " + interfaceName + ", must add annotation @Extensible.");
else
this.extensible = extensible;
// 如果是单例,那么factory不为空
this.factory = extensible.singleton() ? new ConcurrentHashMap<String, T>() : null;
//这个属性里面是这个接口的所有实现类
this.all = new ConcurrentHashMap<String, ExtensionClass<T>>();
if (autoLoad)
//获取到扩展点加载的路径
List<String> paths = RpcConfigs.getListValue(RpcOptions.EXTENSION_LOAD_PATH);
for (String path : paths)
//根据路径加载文件
loadFromFile(path);
拿到所有的扩展点加载的路径后进入到loadFromFile中进行文件的加载
protected synchronized void loadFromFile(String path)
if (LOGGER.isDebugEnabled())
LOGGER.debug("Loading extension of extensible from path: ", interfaceName, path);
// 默认如果不指定文件名字,就是接口名
String file = StringUtils.isBlank(extensible.file()) ? interfaceName : extensible.file().trim();
String fullFileName = path + file;
try
ClassLoader classLoader = ClassLoaderUtils.getClassLoader(getClass());
loadFromClassLoader(classLoader, fullFileName);
catch (Throwable t)
if (LOGGER.isErrorEnabled())
LOGGER.error("Failed to load extension of extensible " + interfaceName + " from path:" + fullFileName,
t);
protected void loadFromClassLoader(ClassLoader classLoader, String fullFileName) throws Throwable
Enumeration<URL> urls = classLoader != null ? classLoader.getResources(fullFileName)
: ClassLoader.getSystemResources(fullFileName);
// 可能存在多个文件。
if (urls != null)
while (urls.hasMoreElements())
// 读取一个文件
URL url = urls.nextElement();
if (LOGGER.isDebugEnabled())
LOGGER.debug("Loading extension of extensible from classloader: and file: ",
interfaceName, classLoader, url);
BufferedReader reader = null;
try
reader = new BufferedReader(new InputStreamReader(url.openStream(), "UTF-8"));
String line;
while ((line = reader.readLine()) != null)
readLine(url, line);
catch (Throwable t)
if (LOGGER.isWarnEnabled())
LOGGER.warn("Failed to load extension of extensible " + interfaceName
+ " from classloader: " + classLoader + " and file:" + url, t);
finally
if (reader != null)
reader.close();
接下来进入到readLine,这个方法主要是读取prop文件里面的每一行记录,并加载该实现类的类文件校验完后将文件添加到all属性中
protected void readLine(URL url, String line)
//读取文件里面的一行记录,并将这行记录用=号分割
String[] aliasAndClassName = parseAliasAndClassName(line);
if (aliasAndClassName == null || aliasAndClassName.length != 2)
return;
//别名
String alias = aliasAndClassName[0];
//包名
String className = aliasAndClassName[1];
// 读取配置的实现类
Class tmp;
try
tmp = ClassUtils.forName(className, false);
catch (Throwable e)
if (LOGGER.isWarnEnabled())
LOGGER.warn("Extension of extensible is disabled, cause by: ",
className, interfaceName, ExceptionUtils.toShortString(e, 2));
if (LOGGER.isDebugEnabled())
LOGGER.debug("Extension " + className + " of extensible " + interfaceName + " is disabled.", e);
return;
if (!interfaceClass.isAssignableFrom(tmp))
throw new IllegalArgumentException("Error when load extension of extensible " + interfaceName +
" from file:" + url + ", " + className + " is not subtype of interface.");
Class<? extends T> implClass = (Class<? extends T>) tmp;
// 检查是否有可扩展标识
Extension extension = implClass.getAnnotation(Extension.class);
if (extension == null)
throw new IllegalArgumentException("Error when load extension of extensible " + interfaceName +
" from file:" + url + ", " + className + " must add annotation @Extension.");
else
String aliasInCode = extension.value();
if (StringUtils.isBlank(aliasInCode))
// 扩展实现类未配置@Extension 标签
throw new IllegalArgumentException("Error when load extension of extensible " + interfaceClass +
" from file:" + url + ", " + className + "'s alias of @Extension is blank");
if (alias == null)
// spi文件里没配置,用代码里的
alias = aliasInCode;
else
// spi文件里配置的和代码里的不一致
if (!aliasInCode.equals(alias))
throw new IllegalArgumentException("Error when load extension of extensible " + interfaceName +
" from file:" + url + ", aliases of " + className + " are " +
"not equal between " + aliasInCode + "(code) and " + alias + "(file).");
// 接口需要编号,实现类没设置
if (extensible.coded() && extension.code() < 0)
throw new IllegalArgumentException("Error when load extension of extensible " + interfaceName +
" from file:" + url + ", code of @Extension must >=0 at " + className + ".");
// 不可以是default和*
if (StringUtils.DEFAULT.equals(alias) || StringUtils.ALL.equals(alias))
throw new IllegalArgumentException("Error when load extension of extensible " + interfaceName +
" from file:" + url + ", alias of @Extension must not \\"default\\" and \\"*\\" at " + className + ".");
// 检查是否有存在同名的
ExtensionClass old = all.get(alias);
ExtensionClass<T> extensionClass = null;
if (old != null)
// 如果当前扩展可以覆盖其它同名扩展
if (extension.override())
// 如果优先级还没有旧的高,则忽略
if (extension.order() < old.getOrder())
if (LOGGER.isDebugEnabled())
LOGGER.debug("Extension of extensible with alias override from to failure, " +
"cause by: order of old extension is higher",
interfaceName, alias, old.getClazz(), implClass);
else
if (LOGGER.isInfoEnabled())
LOGGER.info("Extension of extensible with alias : has been override to ",
interfaceName, alias, old.getClazz(), implClass);
// 如果当前扩展可以覆盖其它同名扩展
extensionClass = buildClass(extension, implClass, alias);
// 如果旧扩展是可覆盖的
else
if (old.isOverride() && old.getOrder() >= extension.order())
// 如果已加载覆盖扩展,再加载到原始扩展
if (LOGGER.isInfoEnabled())
LOGGER.info("Extension of extensible with alias : has been loaded, ignore origin ",
interfaceName, alias, old.getClazz(), implClass);
else
// 如果不能被覆盖,抛出已存在异常
throw new IllegalStateException(
"Error when load extension of extensible " + interfaceClass + " from file:" + url +
", Duplicate class with same alias: " + alias + ", " + old.getClazz() + " and " + implClass);
else
extensionClass = buildClass(extension, implClass, alias);
if (extensionClass != null)
// 检查是否有互斥的扩展点
for (Map.Entry<String, ExtensionClass<T>> entry : all.entrySet())
ExtensionClass existed = entry.getValue();
if (extensionClass.getOrder() >= existed.getOrder())
// 新的优先级 >= 老的优先级,检查新的扩展是否排除老的扩展
String[] rejection = extensionClass.getRejection();
if (CommonUtils.isNotEmpty(rejection))
for (String rej : rejection)
existed = all.get(rej);
if (existed == null || extensionClass.getOrder() < existed.getOrder())
continue;
ExtensionClass removed = all.remove(rej);
if (removed != null)
if (LOGGER.isInfoEnabled())
LOGGER.info(
"Extension of extensible with alias : has been reject by new ",
interfaceName, removed.getAlias(), removed.getClazz(), implClass);
else
String[] rejection = existed.getRejection();
if (CommonUtils.isNotEmpty(rejection))
for (String rej : rejection)
if (rej.equals(extensionClass.getAlias()))
// 被其它扩展排掉
if (LOGGER.isInfoEnabled())
LOGGER.info(
"Extension of extensible with alias : has been reject by old ",
interfaceName, alias, implClass, existed.getClazz());
return;
loadSuccess(alias, extensionClass);
加载完文件后我们再回到
ConsumerBootstrap sofa = ExtensionLoaderFactory.getExtensionLoader(ConsumerBootstrap.class).getExtension("sofa");
进入到getExtension方法中
public ExtensionClass<T> getExtensionClass(String alias)
return all == null ? null : all.get(alias);
public T getExtension(String alias)
//从all属性中拿到加载的class
ExtensionClass<T> extensionClass = getExtensionClass(alias);
if (extensionClass == null)
throw new SofaRpcRuntimeException("Not found extension of " + interfaceName + " named: \\"" + alias + "\\"!");
else
//在加载class的时候,校验了是否是单例,如果是单例,那么factory不为null
if (extensible.singleton() && factory != null)
T t = factory.get(alias);
if (t == null)
synchronized (this)
t = factory.get(alias);
if (t == null)
//实例化
t = extensionClass.getExtInstance();
//放入到factory,单例的class下次直接拿就好了,不需要重新创建
factory.put(alias, t);
return t;
else
//实例化
return extensionClass.getExtInstance();
我们进入到ExtensionClass看看getExtInstance方法
/**
* 服务端实例对象(只在是单例的时候保留)
* 用volatile修饰,保证了可见性
*/
private volatile transient T instance;
/**
* 得到服务端实例对象,如果是单例则返回单例对象,如果不是则返回新创建的实例对象
*
* @param argTypes 构造函数参数类型
* @param args 构造函数参数值
* @return 扩展点对象实例 ext instance
*/
public T getExtInstance(Class[] argTypes, Object[] args)
if (clazz != null)
try
if (singleton) // 如果是单例
if (instance == null)
synchronized (this)
if (instance == null)
//通过反射创建实例
instance = ClassUtils.newInstanceWithArgs(clazz, argTypes, args);
return instance; // 保留单例
else
//通过反射创建实例
return ClassUtils.newInstanceWithArgs(clazz, argTypes, args);
catch (Exception e)
throw new SofaRpcRuntimeException("create " + clazz.getCanonicalName() + " instance error", e);
throw new SofaRpcRuntimeException("Class of ExtensionClass is null");
看完了SOFARPC的扩展类实现后感觉代码写的非常的整洁,逻辑非常的清晰,里面有很多可以学习的地方,比如线程安全用到了双重检查锁和volatile保证可见性。
以上是关于源码分析---SOFARPC可扩展的机制SPI的主要内容,如果未能解决你的问题,请参考以下文章