Dubbo点滴之暴露服务解析
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Dubbo点滴之暴露服务解析相关的知识,希望对你有一定的参考价值。
Dubbo点滴(1) SPI入门 :了解下底层DUBBO底层扩展保证
Dubbo点滴(2)之集群容错:简单分析了DUBBO对高可用的保证手段
Dubbo点滴(3)之服务配置ServiceConfig:了解DUBBO相关元数据与对象
本节内容主要了解下DUBBO服务端的暴露服务过程。
吐下槽,这个Exporter命名个人感觉不好。
Export,在DUBBO中既代表暴露过程,这儿是动作,对应export方法。
同时,Exporter接口,却代表着暴露的信息,比如URL。 总之,在研究过程中,对自己的理解代理干扰。
首先分享下DUBBO官网的暴露服务时序图
吐下槽,虽然该图来自官网,但有些信息和实际代码有些出入。
不管怎样,从整体上来看,可以帮助我们了解整体过程。个人也花了些时间,对该图细化下。两图可以互相参照着看。下图,是个人花时间整理的。
该图,比官网提供的图描述更细,当然也只是把主体调用过程进行了罗列。个人按习惯分为三部分。
section 1: 生成代理,构造Invoker对象;
section 2: Protocol调用链层,实现功能动态扩展
section 3: 负责socket端口绑定,借助底层netty,mina框架实现具体网络交互工作。
接下来,从section 2开发剖析。
2.Protocol协议
Protocol是将Invoker暴露的入口。
2.1 层次结构图
2.2 Protocol API
Protocol被@SPI注解,这说明该接口是动态适配的。
@SPI("dubbo") public interface Protocol { /** * 获取缺省端口,当用户没有配置端口时使用。 * * @return 缺省端口 */ int getDefaultPort(); /** * 暴露远程服务:<br> */ @Adaptive <T> Exporter<T> export(Invoker<T> invoker) throws RpcException; /** * 引用远程服务:<br> */ @Adaptive <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException; /** * 释放协议:<br> */ void destroy(); }
filter=com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper listener=com.alibaba.dubbo.rpc.protocol.ProtocolListenerWrapper registry=com.alibaba.dubbo.registry.integration.RegistryProtocol injvm=com.alibaba.dubbo.rpc.protocol.injvm.InjvmProtocol rmi=com.alibaba.dubbo.rpc.protocol.rmi.RmiProtocol dubbo=com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol hessian=com.alibaba.dubbo.rpc.protocol.hessian.HessianProtocol redis=com.alibaba.dubbo.rpc.protocol.redis.RedisProtocol # com.alibaba.dubbo.rpc.protocol.http.HttpProtocol?? thrift=com.alibaba.dubbo.rpc.protocol.thrift.ThriftProtocol # com.alibaba.dubbo.rpc.protocol.webservice.WebServiceProtocol memcached=memcom.alibaba.dubbo.rpc.protocol.memcached.MemcachedProtocol
简单把Protocol实现分为2大类。
filter,listener,register 采用代理模式,主要实现Protocol的扩展,调用具体protocal实现类完成。
其他,如dubbo,injvm,hession等,负责具体通讯工作。
3. 过程分析
3.1 测试代码(来自测试类DubboProtocolTest)
Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension(); ProxyFactory proxy = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension(); DemoService service = new DemoServiceImpl(); protocol.export(proxy.getInvoker(service, DemoService.class, URL.valueOf("dubbo://127.0.0.1:9010/" + DemoService.class.getName())));
3.2 Protocol选择
由ExtensionLoader负责完成。具体代码片段
Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
最后的protocol是个包装类。如下图
至于具体解析过程,参见《Dubbo原理解析-Dubbo内核实现之基于SPI思想Dubbo内核实现 》
3.3 ProtocolFilterWrapper VS ProtocolListenerWrapper
通过图,可以知道
Listener,Filter ,DUBBO提供了动态扩展。
ProtocolFilterWrapper 和 ProtocolListenerWrapper都是使用代理模式,实现功能扩展。但两者从使用场景上有所差别。
ProtocolFilterWrapper :主要通过构造调用链,实现功能扩展。提供了丰富的Filter接口。
ProtocolListenerWrapper:通过对服务的暴露和下线进行消息通知,DUBBO提供Listener接口不多。
下面简单分享下调用时机
ListenerExporterWrapper.java
public class ListenerExporterWrapper<T> implements Exporter<T> { private static final Logger logger = LoggerFactory.getLogger(ListenerExporterWrapper.class); private final Exporter<T> exporter; private final List<ExporterListener> listeners; public ListenerExporterWrapper(Exporter<T> exporter, List<ExporterListener> listeners){ if (exporter == null) { throw new IllegalArgumentException("exporter == null"); } this.exporter = exporter; this.listeners = listeners; if (listeners != null && listeners.size() > 0) { RuntimeException exception = null; for (ExporterListener listener : listeners) { if (listener != null) { try { listener.exported(this); } catch (RuntimeException t) { logger.error(t.getMessage(), t); exception = t; } } } if (exception != null) { throw exception; } } } public Invoker<T> getInvoker() { return exporter.getInvoker(); } public void unexport() { try { exporter.unexport(); } finally { if (listeners != null && listeners.size() > 0) { RuntimeException exception = null; for (ExporterListener listener : listeners) { if (listener != null) { try { listener.unexported(this); } catch (RuntimeException t) { logger.error(t.getMessage(), t); exception = t; } } } if (exception != null) { throw exception; } } } } }
Listener的调用,发生在export,unexport执行后进行
ProtocolFilterWrapper.java
public class ProtocolFilterWrapper implements Protocol { private final Protocol protocol; public ProtocolFilterWrapper(Protocol protocol){ if (protocol == null) { throw new IllegalArgumentException("protocol == null"); } this.protocol = protocol; } public int getDefaultPort() { return protocol.getDefaultPort(); } public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) { return protocol.export(invoker); } return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER)); } public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException { if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) { return protocol.refer(type, url); } return buildInvokerChain(protocol.refer(type, url), Constants.REFERENCE_FILTER_KEY, Constants.CONSUMER); } public void destroy() { protocol.destroy(); } private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) { Invoker<T> last = invoker; List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group); if (filters.size() > 0) { for (int i = filters.size() - 1; i >= 0; i --) { final Filter filter = filters.get(i); final Invoker<T> next = last; last = new Invoker<T>() { public Class<T> getInterface() { return invoker.getInterface(); } public URL getUrl() { return invoker.getUrl(); } public boolean isAvailable() { return invoker.isAvailable(); } public Result invoke(Invocation invocation) throws RpcException { return filter.invoke(next, invocation); } public void destroy() { invoker.destroy(); } @Override public String toString() { return invoker.toString(); } }; } } return last; } }
以CacheFilter.java为例
@Activate(group = {Constants.CONSUMER, Constants.PROVIDER}, value = Constants.CACHE_KEY) public class CacheFilter implements Filter { private CacheFactory cacheFactory; public void setCacheFactory(CacheFactory cacheFactory) { this.cacheFactory = cacheFactory; } public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException { if (cacheFactory != null && ConfigUtils.isNotEmpty(invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.CACHE_KEY))) { Cache cache = cacheFactory.getCache(invoker.getUrl().addParameter(Constants.METHOD_KEY, invocation.getMethodName())); if (cache != null) { String key = StringUtils.toArgumentString(invocation.getArguments()); if (cache != null && key != null) { Object value = cache.get(key); if (value != null) { return new RpcResult(value); } Result result = invoker.invoke(invocation); if (! result.hasException()) { cache.put(key, result.getValue()); } return result; } } } return invoker.invoke(invocation); } }
可以看出,Filter会在包裹着服务调用。
最后总结下两者区别
区别 | ProtocolFilterWrapper | ProtocolListenerWrapper |
内部实现类 | 多 | 少 |
调用时机 | 服务具体调用时 | 服务暴露,服务下线时 |
使用场景 | RPC调用时,动态扩展 | 消息通知 |
上下文 | 类似Around Advise | 类似After Advise |
组织形式 | 调用链 | 循环遍历 |
4. 服务暴露过程主体类图
DubboProtocol:具体协议实现类,服务的暴露,调用入口
Exchangers:ExchangeServer,ExchangeClient工厂类
HeaderExchanger:Exchanger意为“交换”的意思,表示数据交换接口,连接上下两侧。
HeaderExchangeServer:提供分布式服务的服务器Exchange接口
NettyTransporter:提供网络基本接口(bind,connection)
NettyServer:借助第三方netty,对外提供端口。
本文出自 “简单” 博客,请务必保留此出处http://dba10g.blog.51cto.com/764602/1885295
以上是关于Dubbo点滴之暴露服务解析的主要内容,如果未能解决你的问题,请参考以下文章