Apache Dubbo 之 内核剖析

Posted chun_soft

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Apache Dubbo 之 内核剖析相关的知识,希望对你有一定的参考价值。

本文的源码是基于 Dubbo 2.7.2 版本进行的分析

1、Dubbo 的 SPI 机制

在 Dubbo 中,SPI 贯穿在整个 Dubbo 的核心。所以有必要对 spi 做一个详细了解。

SPI 机制在很多地方都有用到。

1.1 关于 Java SPI

了解 Dubbo 里面的 SPI 机制之前,我们先了解下 Java 提供的 SPI(service provider interface)机制,SPI 是 JDK 内置的一种服务提供发现机制。目前市面上有很多框架都是用它来做服务的扩展发现。简单来说,它是一种动态替换发现的机制。举个简单的例子,我们想在运行时动态给它添加实现,你只需要添加一个实现,然后把新的实现描述给 JDK 知道就行了。大家耳熟能详的如 JDBC、日志框架都有用到。

1.2 实现 SPI 需要遵循的标准

我们如何去实现一个标准的 SPI 发现机制呢?其实很简单,只需要满足以下提交就行了

  1. 需要在 classpath 下创建一个目录,该目录命名必须是:META-INF/service
  2. 在该目录下创建一个 properties 文件,该文件需要满足以下几个条件
    2.1 文件名必须是扩展的接口的全路径名称
    2.2 文件内部描述的是该扩展接口的所有实现类
    2.3 文件的编码格式是 UTF-8
  3. 通过 java.util.ServiceLoader 的加载机制来发现

1.3 SPI 的实际应用



这个文件里面写的就是 mysql 的驱动实现。通过 SPI 机制把 java.sql.Driver 和 mysql 的驱动做了集成。这样就达到了各个数据库厂商自己去实现数据库连接,jdk 本身不关心你怎么实现。

1.4 SPI 的缺点

  1. JDK 标准的 SPI 会一次性加载实例化扩展点的所有实现,什么意思呢?就是如果你在 META-INF/service 下的文件里面加了 N 个实现类,那么 JDK 启动的时候都会一次性全部加载。那么如果有的扩展点实现初始化很耗时或者如果有些实现类并没有用到,那么会很浪费资源;
  2. 如果扩展点加载失败,会导致调用方报错,而且这个错误很难定位到是这个原因。

1.5 Dubbo 优化后的 SPI 机制

1.5.1 基于 Dubbo SPI 的实现自己的扩展

Dubbo 的 SPI 扩展机制,有两个规则

  1. 需要在 resource 目录下配置 META-INF/dubbo 或者 META-INF/dubbo/internal 或者 META-INF/services,并基于 SPI 接口去创建一个文件。
  2. 文件名称和接口名称保持一致,文件内容和 SPI 有差异,内容是 KEY 对应 Value
    Dubbo 针对的扩展点非常多,可以针对协议、拦截、集群、路由、负载均衡、序列化、容器… 几乎里面用到的所有功能,都可以实现自己的扩展,我觉得这个是 dubbo 比较强大的一点。

比如我们可以针对协议做一个扩展

扩展协议拓展点

  1. 创建如下结构,添加 META-INF.dubbo 文件。类名和 Dubbo 提供的协议扩展点接口保持一致。
myProtocol=com.xx.MyProtocol
  1. 创建 MyProtocol 协议类 可以实现自己的协议,我们为了模拟协议产生了作用,修改一个端口
public class DubboProtocol extends Protocol 
    @Override
    public int getDefaultPort() 
        return "8888";
    

  1. 在调用处执行如下代码
Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class)
.getExtension("myProtocol");
System.out.print(protocol.getDefaultPort);
  1. 输出结果,可以看到运行结果,是执行的自定义的协议扩展点。

总结:总的来说,思路和 SPI 是差不多。都是基于约定的路径下制定配置文件。目的,通过配置的方式轻松实现功能的扩展。

我们的猜想是,一定有一个地方通过读取指定路径下的所有文件进行 load。然后讲对应的结果保存到一个 map 中,key 对应为名称,value 对应为实现类。那么这个实现,一定就在 ExtensionLoader 中了。接下来我们就可以基于这个猜想去看看代码的实现。

1.5.2 Dubbo 的扩展点原理实现

在看它的实现代码之前,大家要思考一个问题,所谓的扩展点,就是通过指定目录下配置一个对应接口的实现类,然后程序会进行查找和解析,找到对应的扩展点。那么这里就涉及到两个问题:

  1. 怎么解析?
  2. 被加载的类如何存储和使用?
  • (1)ExtensionLoader.getExtensionLoader.getExtension

我们从这段代码着手,去看看到底做了什么事情,能够通过这样一段代码实现扩展协议的查找和加载。

public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> type) 
    if (type == null) 
        throw new IllegalArgumentException("Extension type == null");
    
    if (!type.isInterface()) 
        throw new IllegalArgumentException("Extension type (" + type + ") is not an interface!");
    
    if (!withExtensionAnnotation(type)) 
        throw new IllegalArgumentException("Extension type (" + type +
                ") is not an extension, because it is NOT annotated with @" + SPI.class.getSimpleName() + "!");
    

    ExtensionLoader<T> loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
    if (loader == null) 
        EXTENSION_LOADERS.putIfAbsent(type, new ExtensionLoader<T>(type));
        loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
    
    return loader;

  • (2)实例化 ExtensionLoader
    如果当前的 type=ExtensionFactory type,那么 objectFactory=null, 否则会创建一个自适应扩展点给到 objectFacotry,目前来说具体做什么咱们先不关心
private ExtensionLoader(Class<?> type) 
    this.type = type;
    objectFactory = (type == ExtensionFactory.class ? null : ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension());

objectFactory 在这里赋值了,并且是返回一个 AdaptiveExtension(). 这个暂时不展开,后面再分析

  • (3)getExtension

这个方法就是根据一个名字来获得一个对应类的实例,所以我们来猜想一下,回想一下前面咱们配置的自定义协议,name 实际上就是 myprotocol,而返回的实现类应该就是 MyProtocol。

 public T getExtension(String name) 
    return getExtension(name, true);


public T getExtension(String name, boolean wrap) 
    if (StringUtils.isEmpty(name)) 
        throw new IllegalArgumentException("Extension name == null");
    
    
    // 如果 name=true,表示返回一个默认的扩展点
    if ("true".equals(name)) 
        return getDefaultExtension();
    
    
    // 缓存一下,如果实例已经加载过了,直接从缓存读取
    final Holder<Object> holder = getOrCreateHolder(name);
    Object instance = holder.get();
    if (instance == null) 
        synchronized (holder) 
            instance = holder.get();
            if (instance == null) 
                // 根据名称创建实例
                instance = createExtension(name, wrap);
                holder.set(instance);
            
        
    
    return (T) instance;

  • (4)createExtension
    仍然是根据名称创建扩展:getExtensionClasses() 加载指定路径下的所有文件
private T createExtension(String name, boolean wrap) 
    Class<?> clazz = getExtensionClasses().get(name);
    if (clazz == null) 
        throw findException(name);
    
    try 
        // 这里用一个 ConcurrentMap 来保存实例,做缓存
        T instance = (T) EXTENSION_INSTANCES.get(clazz);
        if (instance == null) 
            EXTENSION_INSTANCES.putIfAbsent(clazz, clazz.newInstance());
            instance = (T) EXTENSION_INSTANCES.get(clazz);
        
        // 实例注入,可以猜到,这里应该是对这个实例中的成员属性来实现依赖注入的功能
        injectExtension(instance);

        if (wrap) 

            List<Class<?>> wrapperClassesList = new ArrayList<>();
            if (cachedWrapperClasses != null) 
                wrapperClassesList.addAll(cachedWrapperClasses);
                wrapperClassesList.sort(WrapperComparator.COMPARATOR);
                Collections.reverse(wrapperClassesList);
            

            if (CollectionUtils.isNotEmpty(wrapperClassesList)) 
                for (Class<?> wrapperClass : wrapperClassesList) 
                    Wrapper wrapper = wrapperClass.getAnnotation(Wrapper.class);
                    if (wrapper == null
                        || (ArrayUtils.contains(wrapper.matches(), name) && !ArrayUtils.contains(wrapper.mismatches(), name))) 
                        instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
                    
                
            
        

        initExtension(instance);
        return instance;
     catch (Throwable t) 
        throw new IllegalStateException("Extension instance (name: " + name + ", class: " +
            type + ") couldn't be instantiated: " + t.getMessage(), t);
    

  • (5)getExtensionClasses
    这个方法,会查找指定目录
    /META-INF/dubbo || /META-INF/services 下对应的 type->也就是本次演示案例的 Protocol 的 properties 文件,然后扫描这个文件下的所有配置信息。然后保存到一个 HashMap 中(classes),key=name(对应 protocol 文件中配置的 myprotocol), value=对应配置的类的实例
private Map<String, Class<?>> getExtensionClasses() 
    Map<String, Class<?>> classes = cachedClasses.get();
    if (classes == null) 
        synchronized (cachedClasses) 
            classes = cachedClasses.get();
            if (classes == null) 
                // 这里的代码就是加载的过程
                classes = loadExtensionClasses();
                cachedClasses.set(classes);
            
        
    
    return classes;

private Map<String, Class<?>> loadExtensionClasses() 
    cacheDefaultExtensionName();

    Map<String, Class<?>> extensionClasses = new HashMap<>();

    for (LoadingStrategy strategy : strategies) 
        loadDirectory(extensionClasses, strategy.directory(), type.getName(), strategy.preferExtensionClassLoader(), strategy.overridden(), strategy.excludedPackages());
        loadDirectory(extensionClasses, strategy.directory(), type.getName().replace("org.apache", "com.alibaba"), strategy.preferExtensionClassLoader(), strategy.overridden(), strategy.excludedPackages());
    

    return extensionClasses;

  • (6)injectExtension
private T injectExtension(T instance) 

    if (objectFactory == null) 
        return instance;
    

    try 
        for (Method method : instance.getClass().getMethods()) 
            if (!isSetter(method)) 
                continue;
            
            /**
             * Check @link DisableInject to see if we need auto injection for this property
             */
            if (method.getAnnotation(DisableInject.class) != null) 
                continue;
            
            Class<?> pt = method.getParameterTypes()[0];
            if (ReflectUtils.isPrimitives(pt)) 
                continue;
            

            try 
                String property = getSetterProperty(method);
                Object object = objectFactory.getExtension(pt, property);
                if (object != null) 
                    method.invoke(instance, object);
                
             catch (Exception e) 
                logger.error("Failed to inject via method " + method.getName()
                        + " of interface " + type.getName() + ": " + e.getMessage(), e);
            

        
     catch (Exception e) 
        logger.error(e.getMessage(), e);
    
    return instance;

分析到这里我们发现,所谓的扩展点,套路都一样,不管是 springfactorieyLoader,还是 Dubbo 的 spi。实际上,Dubbo 的功能会更加强大,比如自适应扩展点,比如依赖注入。

2、Adaptive 自适应扩展点

什么叫自适应扩展点呢?我们先演示一个例子,在下面这个例子中,我们传入一个 Compiler 接口,它会返回一个 AdaptiveCompiler。这个就叫自适应。

Compiler compiler = ExtensionLoader.getExtensionLoader(Compiler.class).getAdaptiveExtension(); 
System.out.println(compiler.getClass());

它是怎么实现的呢? 我们根据返回的 AdaptiveCompiler 这个类,看到这个类上面有一个注解@Adaptive。 这个就是一个自适应扩展点的标识。它可以修饰在类上,也可以修饰在方法上面。

这两者有什么区别呢? 简单来说,放在类上,说明当前类是一个确定的自适应扩展点的类。如果是放在方法级别,那么需要生成一个动态字节码,来进行转发。

比如拿 Protocol 这个接口来说,它里面定义了 export 和 refer 两个抽象方法,这两个方法分别带有@Adaptive 的标识,标识是一个自适应方法。

我们知道 Protocol 是一个通信协议的接口,具体有多种实现,那么这个时候选择哪一种呢? 取决于我们在使用 dubbo 的时候所配置的协议名称。而这里的方法层面的 Adaptive 就决定了当前这个方法会采用何种协议来发布服务。

2.1 getAdaptiveExtension

这个方法主要就是要根据传入的接口返回一个自适应的实现类

public T getAdaptiveExtension() 
    // cacheAdaptiveInstance,是一个缓存,在 dubbo 中大量用到这种内存缓存
    Object instance = cachedAdaptiveInstance.get();
    if (instance == null) 
        if (createAdaptiveInstanceError != null) 
            throw new IllegalStateException("Failed to create adaptive instance: " +
                    createAdaptiveInstanceError.toString(),
                    createAdaptiveInstanceError);
        

        synchronized (cachedAdaptiveInstance) 
            instance = cachedAdaptiveInstance.get();
            if (instance == null) 
                try 
                    // 很明显,这里是创建一个自适应扩展点的实现
                    instance = createAdaptiveExtension();
                    cachedAdaptiveInstance.set(instance);
                 catch (Throwable t) 
                    createAdaptiveInstanceError = t;
                    throw new IllegalStateException("Failed to create adaptive instance: " + t.toString(), t);
                
            
        
    

    return (T) instance;

2.2 createAdaptiveExtension

这个方法中做两个事情

  1. 获得一个自适应扩展点实例
  2. 实现依赖注入
private T createAdaptiveExtension()  
try 
    return injectExtension((T).getAdaptiveExtensionClass()
    .newInstance()); 
     catch (Exception e) 
        throw new IllegalStateException("Can't create adaptive extension " + type + ", cause: " + e.getMessage (), e);
        t);
     

getExtensionClasses()这个方法在前面讲过了,会加载当前传入的类型的所有扩展点,保存在一个 hashmap 中,这里有一个判断逻辑,如果 cachedApdaptiveClas!=null ,直接返回这个 cachedAdaptiveClass,这里大家可以猜一下,这个 cachedAdaptiveClass 是一个什么?

cachedAdaptiveClass,还记得前面讲过 Adaptive 可以放在两个位置,一个是类级别,一个是方法级别。那么这个 cachedAdaptiveClass 很显然,就是放在类级别的 Adaptive,表示告诉 dubbo spi loader,“我是一个自适应扩展点,你来加载我吧”

cachedAdaptiveClass 应该是在加载解析/META-INF/dubbo 下的扩展点的时候加载进来的。在加载完之后如果这个类有@Adaptive 标识,则会赋值赋值而给 cachedAdaptiveClass

如果 cachedAdaptiveClass 不存在,dubbo 会动态生成一个代理类 Protocol$Adaptive. 前面的名字 protocol 是根据当前 ExtensionLoader 所加载的扩展点来定义的。

private Class<?> getAdaptiveExtensionClass() 
    getExtensionClasses();
    if (cachedAdaptiveClass != null) 
        return cachedAdaptiveClass;
    
    return cachedAdaptiveClass = createAdaptiveExtensionClass();

2.3 createAdaptiveExtensionClass

动态生成字节码,然后进行动态加载。那么这个时候锁返回的 class,如果加载的是 Protocol.class,应该是 Protocol$Adaptive 这个 cachedDefaultName 实际上就是扩展点接口的 @SPI 注解对应的名字,如果此时加载的是 Protocol.class,那么 cachedDefaultName=dubbo

private Class<?> createAdaptiveExtensionClass() 
    String code = new AdaptiveClassCodeGenerator(type, cachedDefaultName).generate(); 
    ClassLoader classLoader = findClassLoader(); org.apache.dubbo.common.compiler.Compiler compiler = Apache Dubbo 之 内核剖析

Apache Dubbo 之 内核剖析

《一》Dubbo之总体架构剖析

深入理解 Dubbo SPI 扩展机制

深度剖析阿里巴巴对Apache Flink 的优化与改进

来撸一撸Dubbo之SPI机制源码,SPI到底解决了什么问题?