dubbo 源码 v2.7 分析:核心机制

Posted 程序员架构进阶

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了dubbo 源码 v2.7 分析:核心机制相关的知识,希望对你有一定的参考价值。

系列文章

dubbo 源码 v2.7 分析:结构、container 入口及线程模型

dubbo 源码 v2.7 分析:SPI 机制



一 回顾

上一篇我们介绍了SPI机制。本篇会先介绍dubbo中的核心机制,包括设计模式、bean加载、扩展点机制、动态代理和远程调用流程。

二 设计模式

2.1 装饰器&责任链模式

2.1.1 装饰器模式

   装饰器模式(Decorator Pattern)允许向一个现有的对象添加新的功能,同时又不改变其结构。这种类型的设计模式属于结构型模式,它是作为现有的类的一个包装。

这种模式创建了一个装饰类,用来包装原有的类,并在保持类方法签名完整性的前提下,提供了额外的功能。

2.1.2 责任链模式

责任链模式(Chain of Responsibility Pattern)为请求创建了一个接收者对象的链。这种模式给予请求的类型,对请求的发送者和接收者进行解耦。这种类型的设计模式属于行为型模式。

在这种模式中,通常每个接收者都包含对另一个接收者的引用。如果一个对象不能处理该请求,那么它会把相同的请求传给下一个接收者,依此类推。

2.1.3 Dubbo中的使用

   Dubbo的源码中,在启动和调用阶段都使用了装饰器。例如生产者(Provider)的调用链,具体的调用过程是在org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper类的方法buildInvokerChain()内完成的。相关代码:

private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) { Invoker<T> last = invoker; List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group); if (!filters.isEmpty()) { for (int i = filters.size() - 1; i >= 0; i--) { final Filter filter = filters.get(i); final Invoker<T> next = last; last = new Invoker<T>() { @Override public Class<T> getInterface() { return invoker.getInterface(); } @Override public URL getUrl() { return invoker.getUrl(); } @Override public boolean isAvailable() { return invoker.isAvailable(); } @Override public Result invoke(Invocation invocation) throws RpcException { Result asyncResult; try { asyncResult = filter.invoke(next, invocation); } catch (Exception e) { // onError callback if (filter instanceof ListenableFilter) { Filter.Listener listener = ((ListenableFilter) filter).listener(); if (listener != null) { listener.onError(e, invoker, invocation); } } throw e; } return asyncResult; } @Override public void destroy() { invoker.destroy(); } @Override public String toString() { return invoker.toString(); } }; } } return new CallbackRegistrationInvoker<>(last, filters); }

在这个方法的第3行(源码中的第55行),通过SPI机制获取Filter列表,从rpc.filter目录下可以看到Filter的实现类集合。每个Filter的实现类,都带有@Activate注解,将注解中含有 group=provider的Filter实现,按照order值的顺序排序,构成了一个调用链条。调用顺序为:

以EchoFilter为例,org.apache.dubbo.rpc.filter.EchoFilter的源码:

@Activate(group = CommonConstants.PROVIDER, order = -110000)public class EchoFilter implements Filter { @Override public Result invoke(Invoker<?> invoker, Invocation inv) throws RpcException { if (inv.getMethodName().equals($ECHO) && inv.getArguments() != null && inv.getArguments().length == 1) { return AsyncRpcResult.newDefaultAsyncResult(inv.getArguments()[0], inv); } return invoker.invoke(inv); }}

这里是装饰器和责任链模式的混合使用。例如,EchoFilter 的作用是判断

是否是回声测试请求,是的话直接返回内容,这是一种责任链的体现。而像 ClassLoaderFilter则只是在主功能上添加了功能,更改当前线程的 ClassLoader,这是典型的装饰器模式。

2.2 观察者模式

2.2.1 定义

当对象间存在一对多关系时,则使用观察者模式(Observer Pattern)。比如,当一个对象被修改时,则会自动通知依赖它的对象。观察者模式属于行为型模式。

2.2.2 Dubbo中的使用

Provider启动时,需要与注册中心进行交互,先注册自己的服务,再订阅自己的服务。而在订阅时,使用了观察者模式,开启一个Listener。注册中心会做定时扫描,默认每5秒检查一次是否有服务更新,如果有,就会向该服务的提供者发送一个notify消息。Provider接收到notify消息后,就会执行NotifyListener的notify方法,执行监听器方法。



2.3 代理模式——动态代理

2.3.1 定义

在代理模式(Proxy Pattern)中,一个类代表另一个类的功能。这种类型的设计模式属于结构型模式。

在代理模式中,我们创建具有现有对象的对象,以便向外界提供功能接口。

2.3.2 Dubbo中的使用

Dubbo扩展了JDK标准SPI类的Adaptive,也就是自适应扩展点,这是一个典型的动态代理实现。Dubbo

需要灵活地控制实现类,即在调用阶段动态地根据参数决定调用哪个实现类,所以采用先生成代理类的方法,能够做到灵活的调用。生成代理类的代码是 ExtensionLoader 的createAdaptiveExtensionClassCode 方法。代理类的主要逻辑是,获取 URL 参数中指定参数的值作为获取实现类的 key。

2.2.3 源码

Adaptive注解:

AdaptiveCompiler是使用Adaptive注解的示例:

@Adaptivepublic class AdaptiveCompiler implements Compiler { private static volatile String DEFAULT_COMPILER; public static void setDefaultCompiler(String compiler) { DEFAULT_COMPILER = compiler; } @Override public Class<?> compile(String code, ClassLoader classLoader) { Compiler compiler; ExtensionLoader<Compiler> loader = ExtensionLoader.getExtensionLoader(Compiler.class); String name = DEFAULT_COMPILER; // copy reference if (name != null && name.length() > 0) { compiler = loader.getExtension(name); } else { compiler = loader.getDefaultExtension(); } return compiler.compile(code, classLoader); }}



Adaptive注解是一个自适应 扩展点的标识。它可以修饰在类上,也可以修饰在方法上面。

修饰类和方法上的区别:放在类上,说明当前类是一个确定的自适应扩展点的类。如果是放在方法级别,那么需要生成一个动态字节码来进行转发。

例如Protocol 接口来说,定义了 export 和 refer 两个抽象方法,这两个方法分别带有@Adaptive 的标识,标识是 一个自适应方法。我们知道 Protocol 是一个通信协议的接口,具体有多种实现,那么这个时候选择哪一种呢?取决于我们在使用 dubbo 的时候所 配置的协议名称。而这里的方法层面的 Adaptive 就决定了当前这个方法会采用何种协议来发布服务。

org.apache.dubbo.rpc.Protocol源码:

/** * Protocol. (API/SPI, Singleton, ThreadSafe) */@SPI("dubbo")public interface Protocol { /** * Get default port when user doesn't config the port. * * @return default port */ int getDefaultPort(); /** * Export service for remote invocation: <br> * 1. Protocol should record request source address after receive a request: * RpcContext.getContext().setRemoteAddress();<br> * 2. export() must be idempotent, that is, there's no difference between invoking once and invoking twice when * export the same URL<br> * 3. Invoker instance is passed in by the framework, protocol needs not to care <br> * * @param <T> Service type * @param invoker Service invoker * @return exporter reference for exported service, useful for unexport the service later * @throws RpcException thrown when error occurs during export the service, for example: port is occupied */ @Adaptive <T> Exporter<T> export(Invoker<T> invoker) throws RpcException; /** * Refer a remote service: <br> * 1. When user calls `invoke()` method of `Invoker` object which's returned from `refer()` call, the protocol * needs to correspondingly execute `invoke()` method of `Invoker` object <br> * 2. It's protocol's responsibility to implement `Invoker` which's returned from `refer()`. Generally speaking, * protocol sends remote request in the `Invoker` implementation. <br> * 3. When there's check=false set in URL, the implementation must not throw exception but try to recover when * connection fails. * * @param <T> Service type * @param type Service class * @param url URL address for the remote service * @return invoker service's local proxy * @throws RpcException when there's any error while connecting to the service provider */ @Adaptive <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException; /** * Destroy protocol: <br> * 1. Cancel all services this protocol exports and refers <br> * 2. Release all occupied resources, for example: connection, port, etc. <br> * 3. Protocol can continue to export and refer new service even after it's destroyed. */ void destroy();}