Dubbo系列讲解之扩展点实现原理分析2万字分享

Posted 波波烤鸭

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Dubbo系列讲解之扩展点实现原理分析2万字分享相关的知识,希望对你有一定的参考价值。

Apache Dubbo 是一款微服务开发框架,它提供了 RPC通信 与 微服务治理 两大关键能力。这意味着,使用 Dubbo 开发的微服务,将具备相互之间的远程发现与通信能力, 同时利用 Dubbo 提供的丰富服务治理能力,可以实现诸如服务发现、负载均衡、流量调度等服务治理诉求。同时 Dubbo 是高度可扩展的,用户几乎可以在任意功能点去定制自己的实现,以改变框架的默认行为来满足自己的业务需求

  本文主要给大家讲解下Dubbo的扩展点原理。

一、SPI介绍

  JDK中的SPI(Service Provider Interface)提供了一种基于接口的扩展机制,主要实现步骤如下:

  • 定义一个接口作为一个标准。
  • 在实现扩展点的工程中创建一个META-INF/services目录,以定义的接口的全类名作为文件名创建一个文件名,将实现类的全类名写入到文件中。
  • 在需要使用扩展点的地方调用java.util.ServiceLoader#load(java.lang.Class)方法,传入接口的全类名,返回java.util.ServiceLoader,ServiceLoader是一个Iterable的实现类,可以通过迭代器获取到所有的扩展,进行执行。

具体不清楚的可以参考下我的另一篇专门介绍SPI的文章:
Java SPI内容详解 学习交流 463257262

二、Dubbo扩展详解

1.Dubbo中的扩展点的增强

  在Dubbo中的扩展点主要是对JDK的扩展点思想做了增强,主要增强了一下功能:

  • 全类名文件中的内容通过key-value的规范书写,加载时也是K-V的存储方式,增加扩展点查找的灵活性
  • JDK中的扩展点的加载会一次性的将所有的扩展点加载到内存中,如果有些扩展点没用,但是改扩展点初始化很耗时,JDK也会将所有的扩展点加载到内存中,这些会造成一些浪费,而Dubbo中的扩展点会按需进行加载(加载时传入扩展点的name,这也是需要依赖于文件的K-V格式)
  • Dubbo增加了对扩展点 IoC 和 AOP 的支持,一个扩展点可以直接 setter 注入其它扩展点。同时在对扩展点进行依赖注入时,也会通过扫描到的Wrapper对扩展点实现进行包装。

2.Dubbo中扩展点的使用方式

  • 定义一个接口,在接口上标注一个@SPI,标识这是一个扩展点
  • 在扩展点实现工程中创建文件:/META-INF/dubbo/扩展点全类名
  • 在文件中定义扩展点实现的k-v格式数据
helloService=com.bobo.spring.cloud.alibaba.consumer.spi.impl.HelloServiceImpl
  • 调用如下代码获取扩展的实现进行调用
HelloService HelloService = ExtensionLoader
						.getExtensionLoader(HelloService.class)
						.getExtension("helloService");
System.out.println(HelloService.sayHello("wangxing"));

3.Dubbo扩展点源码分析

  在Dubbo中存在了以下三种类型的扩展点(Q群:463257262):

  1. 指定名称扩展点
  2. 自适应扩展点
  3. 激活扩展点

3.1 自适应扩展点源码分析

  在Dubbo中,通过在接口上标注【@SPI】标识该接口是一个扩展点,同时在其扩展点实现类或方法上,如果存在【@Adaptive】注解,则表示该类或方法是一个自适应的扩展点。标注在类上时,表示该扩展类是默认的自适应扩展点,标注在方法上时,表示该方法是自适应扩展点,将会重写该方法,使得Dubbo能够在运行时获取到具体的扩展点。加下来就进入源码的分析吧…

  Dubbo的扩展点的入口如下:

HelloService helloService2 = ExtensionLoader
								.getExtensionLoader(HelloService.class)
								.getAdaptiveExtension();

  首先进入到getExtensionLoader()方法

public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> type) {
        if (type == null) {
            throw new IllegalArgumentException("Extension type == null");
        }
        if (!type.isInterface()) {
            throw new IllegalArgumentException("Extension type (" + type + ") is not an interface!");
        }
        if (!withExtensionAnnotation(type)) {
            throw new IllegalArgumentException("Extension type (" + type +
                    ") is not an extension, because it is NOT annotated with @" + SPI.class.getSimpleName() + "!");
        }

        ExtensionLoader<T> loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
        if (loader == null) {
            EXTENSION_LOADERS.putIfAbsent(type, new ExtensionLoader<T>(type));
            loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
        }
        return loader;
    }

  该方法主要做了以下几件事

  • 对传入的的扩展点进行判断
    空值判断
    是否是接口判断
    是否标识了@SPI注解的判断,这也印证了前面我们说的只有标识了@SPI注解的接口Dubbo才会认为它是个扩展点接口

  • 根据类型从EXTENSION_LOADERS缓存中获取ExtensionLoader,获取到就直接返回ExtensionLoader。(EXTENSION_LOADERS是一个CurrentHashMap集合,key为扩展点接口的.class对象,value为该扩展点对应的ExtensionLoader)

  • 如果缓存中未获取到的ExtensionLoader,以扩展点.class对象为key,创建一个ExtensionLoader对象为value存储到EXTENSION_LOADERS中,返回创建的ExtensionLoader。到此就可以获取到一个ExtensionLoader了,通过返回的ExtensionLoader对象可以获得对应的扩展点的实现对象

接下来进入ExtensionLoader类中的构造方法,看看ExtensionLoader实例化时做了什么

private ExtensionLoader(Class<?> type) {
        this.type = type;
        objectFactory = (type == ExtensionFactory.class ? null : ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension());
    }

  在构造器中为type属性复制传入的扩展点的.class对象。同时通过自适应扩展点的方式获取到了一个ExtensionFactory的扩展点的实现,赋值给objectFactory。这里先不详细说明会具体获取到哪个实现,本节分析完成再回过来看。

  通过以上的步骤,一个初始化完成的ExtensionLoader对象已经被获取到了,分析getAdaptiveExtension()获取自适应扩展点实现的流程

  进入getAdaptiveExtension()

public T getAdaptiveExtension() {
    Object instance = cachedAdaptiveInstance.get();
    if (instance == null) {
      // 如果获取到的实例为null,切缓存的错误不能null,抛出异常
      if (createAdaptiveInstanceError != null) {
        throw new IllegalStateException("Failed to create adaptive instance: " +
                                        createAdaptiveInstanceError.toString(),
                                        createAdaptiveInstanceError);
      }

      synchronized (cachedAdaptiveInstance) {
        instance = cachedAdaptiveInstance.get();
        if (instance == null) {
          try {
            instance = createAdaptiveExtension();
            cachedAdaptiveInstance.set(instance);
          } catch (Throwable t) {
            // 异常信息缓存起来,下一次进来时如果发现是创建实例是出现异常,就直接抛出异常。这里的设计应该是当扩展点创建异常时避免多次执行创建流程的优化
            createAdaptiveInstanceError = t;
            throw new IllegalStateException("Failed to create adaptive instance: " + t.toString(), t);
          }
        }
      }
    }
    return (T) instance;
    }

该方法主要做了以下几件事

  • 从缓存cachedAdaptiveInstance中获取扩展点实现,存在该扩展点对象,则直接返回该实例。cachedAdaptiveInstance是一个Holder对象,主要用于缓存该扩展点的实现的具体实例,因为这里只会返回一个自适应扩展点的实现(有多个实现类标注了则按文件定义顺序取最后一个),实现对于每个``ExtensionLoader`来说,自适应扩展点是单例的。

  • 如果扩展点实现不存在,调用createAdaptiveExtension()创建一个具体的实现,并将该实例set到cachedAdaptiveInstance中缓存起来。

创建扩展点实现的具体流程是在createAdaptiveExtension方法中

 private Class<?> getAdaptiveExtensionClass() {
        getExtensionClasses();
        if (cachedAdaptiveClass != null) {
            return cachedAdaptiveClass;
        }
        return cachedAdaptiveClass = createAdaptiveExtensionClass();
    }

  该方法主要做了以下几件事

  • 调用getExtensionClasses(),顾名思义,该方法主要是获取扩展点的所有实现的.class对象。
  • 如果缓存的cachedAdaptiveClass 对象不为null,直接返回。(cachedAdaptiveClass是一个class对象,用于保存该扩展点的自适应扩展点的实现,即是该扩展点的实现类中存在有将@Adaptive标注在类上的默认自适应扩展点)
  • 如果为缓存有cachedAdaptiveClass对象,则调用createAdaptiveExtensionClass创建一个cachedAdaptiveClass,并复制给cachedAdaptiveClass

接下来首先进入getExtensionClasses()方法

private Map<String, Class<?>> getExtensionClasses() {
    Map<String, Class<?>> classes = cachedClasses.get();
    if (classes == null) {
        synchronized (cachedClasses) {
            classes = cachedClasses.get();
            if (classes == null) {
                classes = loadExtensionClasses();
                cachedClasses.set(classes);
            }
        }
    }
    return classes;
}

  该方法首先会从cachedClasses(cachedClasses也是一个holder,用于存储每个扩展点的所有扩展实现的map集合)获取该.class对象,存在则直接返回,否则调用loadExtensionClasses方法加载扩展点的classs,将加载到的classes存到cachedClasses中。
接下来进入loadExtensionClasses

private Map<String, Class<?>> loadExtensionClasses() {
        cacheDefaultExtensionName();

        Map<String, Class<?>> extensionClasses = new HashMap<>();

        for (LoadingStrategy strategy : strategies) {
          // 扫描每个加载策略目录中的扩展点实现
            loadDirectory(extensionClasses, strategy.directory(), type.getName(), strategy.preferExtensionClassLoader(), strategy.overridden(), strategy.excludedPackages());
          // 对alibaba的践行兼容
            loadDirectory(extensionClasses, strategy.directory(), type.getName().replace("org.apache", "com.alibaba"), strategy.preferExtensionClassLoader(), strategy.overridden(), strategy.excludedPackages());
        }

        return extensionClasses;
    }

  首先调用cacheDefaultExtensionName()方法。再给接口标注@SPI的时候,可以给个默认的value值,表示指定的默认的扩展点实现,如@SPI(“dubbo”)public interface Protocol 表示默认的扩展点为扩展点实现名为dubbo的实现。而cacheDefaultExtensionName方法就是通过注解获取到该扩展点的默认扩展点name,赋值给cachedDefaultName

  遍历strategies,获取到多个LoadingStrategy,通过stratery.directory()获取到需要扫描的目录,以下是Dubbo中默认的三种策略的实现

  • DubboInternalLoadingStrategy --> META-INF/dubbo/internal/
  • DubboLoadingStrategy -->META-INF/dubbo/
  • ServicesLoadingStrategy --> META-INF/services/

  在Dubbo中,创建ExtensionLoader对象时,会load到所有的LoadingStrategy,这里利用的是JDK原生的SPI的方式,将LoadingStrategy的所有扩展实现都加载进来,保存到strategies中。所以如果需要扩展Dubbo中的扫描的路径,按照JDK的原生方式进行扩展即可

  进入到loadDirectory方法

private void loadDirectory(Map<String, Class<?>> extensionClasses, String dir, String type,
                               boolean extensionLoaderClassLoaderFirst, boolean overridden, String... excludedPackages) {
        String fileName = dir + type;
        try {
            Enumeration<java.net.URL> urls = null;
            ClassLoader classLoader = findClassLoader();

            // try to load from ExtensionLoader's ClassLoader first
            if (extensionLoaderClassLoaderFirst) {
                ClassLoader extensionLoaderClassLoader = ExtensionLoader.class.getClassLoader();
                if (ClassLoader.getSystemClassLoader() != extensionLoaderClassLoader) {
                    urls = extensionLoaderClassLoader.getResources(fileName);
                }
            }

            if (urls == null || !urls.hasMoreElements()) {
                if (classLoader != null) {
                    urls = classLoader.getResources(fileName);
                } else {
                    urls = ClassLoader.getSystemResources(fileName);
                }
            }

            if (urls != null) {
                while (urls.hasMoreElements()) {
                    java.net.URL resourceURL = urls.nextElement();
                    loadResource(extensionClasses, classLoader, resourceURL, overridden, excludedPackages);
                }
            }
        } catch (Throwable t) {
            logger.error("Exception occurred when loading extension class (interface: " +
                    type + ", description file: " + fileName + ").", t);
        }
    }

  该方法首先扫描传入路径下的所有的以type全类名命名的文件,获取到资源,将获取到的文件转换成Resource,传入到loadResource()方法中

private void loadResource(Map<String, Class<?>> extensionClasses, ClassLoader classLoader,
                              java.net.URL resourceURL, boolean overridden, String... excludedPackages) {
        try {
            try (BufferedReader reader = new BufferedReader(new InputStreamReader(resourceURL.openStream(), StandardCharsets.UTF_8))) {
                String line;
              // 按行读取文件中的每行数据
                while ((line = reader.readLine()) != null) {
                    final int ci = line.indexOf('#');
                    if (ci >= 0) {
                        line = line.substring(0, ci);
                    }
                    line = line.trim();
                    if (line.length() > 0) {
                        try {
                            String name = null;
                          // 通过等号分割,等号前的为key(扩展点name),等号后的为类的全类名
                            int i = line.indexOf('=');
                            if (i > 0) {
                                name = line.substring(0, i).trim();
                                line = line.substring(i + 1).trim();
                            }
                            if (line.length() > 0 && !isExcluded(line, excludedPackages)) {
                                loadClass(extensionClasses, resourceURL, Class.forName(line, true, classLoader), name, overridden);
                            }
                        } catch (Throwable t) {
                            IllegalStateException e = new IllegalStateException("Failed to load extension class (interface: " + type + ", class line: " + line + ") in " + resourceURL + ", cause: " + t.getMessage(), t);
                            exceptions.put(line, e);
                        }
                    }
                }
            }
        } catch (Throwable t) {
            logger.error("Exception occurred when loading extension class (interface: " +
                    type + ", class file: " + resourceURL + ") in " + resourceURL, t);
        }
    }

  首先按行读取文件,对读取到的每一行数据通过=号进行分割,=号前为扩展点的名字,=号后的为扩展点的具体扩展的实现类的全类名

  通过Class.forName将实现类加载到内存中。传入到loadClass()方法

private void loadClass(Map<String, Class<?>> extensionClasses, java.net.URL resourceURL, Class<?> clazz, String name,
                           boolean overridden) throws NoSuchMethodException {
        if (!type.isAssignableFrom(clazz)) {
            throw new IllegalStateException("Error occurred when loading extension class (interface: " +
                    type + ", class line: " + clazz.getName() + "), class "
                    + clazz.getName() + " is not subtype of interface.");
        }
        if (clazz.isAnnotationPresent(Adaptive.class)) {
          	// 会覆盖掉之前保存的`cachedAdaptiveClass`
            cacheAdaptiveClass(clazz, overridden);
        } else if (isWrapperClass(clazz)) {
            cacheWrapperClass(clazz);
        } else {
            clazz.getConstructor();
            if (StringUtils.isEmpty(name)) {
              // 如果扩展文件中的name为空,则调用findAnnotationName方法获取扩展点名字,具体命名方式这里就不详细看了
                name = findAnnotationName(clazz);
                if (name.length() == 0) {
                    throw new IllegalStateException("No such extension name for the class " + clazz.getName() + " in the config " + resourceURL);
                }
            }

            String[] names = NAME_SEPARATOR.split(name);
            if (ArrayUtils.isNotEmpty(names)) {
                cacheActivateClass(clazz, names[0]);
                for (String n : names) {
                    cacheName(clazz, n);
                    saveInExtensionClass(extensionClasses, clazz, n, overridden);
                }
            }
        }
    }

  该方法主要主要做了以下几件事