Dubbo之服务暴露源码分析

Posted Java后端笔记

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Dubbo之服务暴露源码分析相关的知识,希望对你有一定的参考价值。

时序图

在讲解源码前,先看下官方文档提供的时序图,后面的讲解基本是这个路线,但是会更细节化

大致逻辑

首先服务的实现bean在我们的spring容器中,我们会创建一个Invoker通过代理调用ref中的方法,同时Invoker会在protocol的export方法中会转换为Exporter,并且保存在protocol对象的exporterMap中,然后进行暴露。

重要概念

Protocol

Protocol 是服务域,它是 Invoker 暴露和引用的主功能入口,它负责 Invoker 的生命周期管理。 接口定义如下

 
   
   
 
  1. @SPI("dubbo")

  2. public interface Protocol {

  3.    /**

  4.     * 获取缺省端口,当用户没有配置端口时使用。

  5.     *

  6.     * @return 缺省端口

  7.     */

  8.    int getDefaultPort();

  9.    /**

  10.     * 暴露远程服务:<br>

  11.     * 2. export()必须是幂等的,也就是暴露同一个URL的Invoker两次,和暴露一次没有区别。<br>

  12.     * 3. export()传入的Invoker由框架实现并传入,协议不需要关心。<br>

  13.     *

  14.     * @param <T> 服务的类型

  15.     * @param invoker 服务的执行体

  16.     * @return exporter 暴露服务的引用,用于取消暴露

  17.     * @throws RpcException 当暴露服务出错时抛出,比如端口已占用

  18.     */

  19.    @Adaptive

  20.    <T> Exporter<T> export(Invoker<T> invoker) throws RpcException;

  21.    /**

  22.     * 引用远程服务:<br>

  23.     * 1. 当用户调用refer()所返回的Invoker对象的invoke()方法时,协议需相应执行同URL远端export()传入的Invoker对象的invoke()方法。<br>

  24.     * 2. refer()返回的Invoker由协议实现,协议通常需要在此Invoker中发送远程请求。<br>

  25.     * 3. 当url中有设置check=false时,连接失败不能抛出异常,并内部自动恢复。<br>

  26.     *

  27.     * @param <T> 服务的类型

  28.     * @param type 服务的类型

  29.     * @return invoker 服务的本地代理

  30.     * @throws RpcException 当连接服务提供方失败时抛出

  31.     */

  32.    @Adaptive

  33.    <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;

  34.    /**

  35.     * 释放协议:<br>

  36.     * 1. 取消该协议所有已经暴露和引用的服务。<br>

  37.     * 2. 释放协议所占用的所有资源,比如连接和端口。<br>

  38.     * 3. 协议在释放后,依然能暴露和引用新的服务。<br>

  39.     */

  40.    void destroy();

  41. }

Invoker

Invoker 是实体域,它是 Dubbo 的核心模型,其它模型都向它靠扰,或转换成 它,它代表一个可执行体,可向它发起 invoke 调用,它有可能是一个本地的 实现,也可能是一个远程的实现,也可能一个集群实现。 接口定义如下

 
   
   
 
  1. public interface Invoker<T> extends Node {

  2.    Class<T> getInterface();

  3.    Result invoke(Invocation invocation) throws RpcException;

  4. }

Invocation

Invocation 是会话域,它持有调用过程中的变量,比如方法名,参数等。 接口定义如下

 
   
   
 
  1. public interface Invocation {

  2.    String getMethodName();

  3.    Class<?>[] getParameterTypes();

  4.    Object[] getArguments();

  5.    Map<String, String> getAttachments();

  6.    String getAttachment(String key);

  7.    String getAttachment(String key, String defaultValue);

  8.        Invoker<?> getInvoker();

  9. }

Exporter

Exporter用来封装不同协议暴露的Invoker,因为Invoker可以被多个Protocol暴露,因为每种Protocol都有各自的Exproter子类 接口定义如下

 
   
   
 
  1. public interface Exporter<T> {

  2.    Invoker<T> getInvoker();

  3.    void unexport();

  4. }

我的一些定义

本地暴露

本地暴露分为两种,通过远程协议还是本地协议暴露 本地协议的话,基于进程通信,所以不需要进行远程暴露,具体实现只有InjvmProtocol 而基于远程协议的暴露,需要开启服务监听,处理其他进程发来的rpc请求,同时可以选择进行远程暴露,具体实现有DubboProtocol,HessianProtocol等

远程暴露

远程暴露,就是将本地暴露的url发布到注册中心,这个暴露为了让服务引用者感知到服务的存在 远程暴露对应RegistryProtocol

远程暴露URL和本地暴露URL

进行远程暴露的时候,要先进行本地暴露,所以远程暴露URL里面有一个export参数会包含本地暴露URL 远程暴露URL主要是用来选择暴露的注册中心,注册本地暴露URL,以及增加事件监听

源码分析

解析配置

先看下我们平时是如何配置dubbo服务暴露的

 
   
   
 
  1. <dubbo:service interface="com.alibaba.dubbo.demo.bid.BidService" ref="bidService"  protocol="dubbo" />

上面的配置会通过自定义解析器DubboNamespaceHandler解析到ServiceBean对象

 
   
   
 
  1. registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));

然后在ServiceBean初始化完成后进行服务暴露可以看到ServiceBean实现了Initializing接口,可以在afterPropertiesSet看到服务暴露的逻辑

 
   
   
 
  1. if (! isDelay()) {

  2.            export();

  3.        }

这边的Delay并不是服务具体的暴露行为进行延迟,而是控制这个暴露行为在什么时候触发

 
   
   
 
  1. private boolean isDelay() {

  2.        Integer delay = getDelay();

  3.        ProviderConfig provider = getProvider();

  4.        if (delay == null && provider != null) {

  5.            delay = provider.getDelay();

  6.        }

  7.        return supportedApplicationListener && (delay == null || delay.intValue() == -1);

  8.    }

上述代码的意思是,如果支持Spring的事件监听,并且没有配置延迟暴露,推迟到容器refresh完成的时候触发服务暴露逻辑,如果配置了delay,那么直接在afterPropertiesSet内调用暴露方法 我理解为一个是容器级别的delay,一个是服务级别的delay

export方法

export方法在ServiceConfig中

 
   
   
 
  1. public synchronized void export() {

  2.        if (provider != null) {

  3.            if (export == null) {

  4.                export = provider.getExport();

  5.            }

  6.            if (delay == null) {

  7.                delay = provider.getDelay();

  8.            }

  9.        }

  10.        if (export != null && ! export.booleanValue()) {

  11.            return;

  12.        }

  13.        if (delay != null && delay > 0) {

  14.            Thread thread = new Thread(new Runnable() {

  15.                public void run() {

  16.                    try {

  17.                        Thread.sleep(delay);

  18.                    } catch (Throwable e) {

  19.                    }

  20.                    doExport();

  21.                }

  22.            });

  23.            thread.setDaemon(true);

  24.            thread.setName("DelayExportServiceThread");

  25.            thread.start();

  26.        } else {

  27.            doExport();

  28.        }

  29.    }

这边会根据是否配置了delay参数,进行延迟暴露,通过线程休眠来实现 doExport方法涉及很多参数的校验与设置,遇到具体功能点再做分析,具体暴露逻辑调用了方法doExportUrls

 
   
   
 
  1. private void doExportUrls() {

  2.        List<URL> registryURLs = loadRegistries(true);

  3.        for (ProtocolConfig protocolConfig : protocols) {

  4.            doExportUrlsFor1Protocol(protocolConfig, registryURLs);

  5.        }

  6.    }

doExportUrls方法首先会获取注册中心的URL,虽说可以配置很多个注册中心,但是我们就把它当成一个好了 然后根据service配置的不同协议,调用doExportUrlsFor1Protocol方法分别进行暴露 在doExportUrlsFor1Protocol的前半部分又是各种参数的提取,用来生成最终暴露的URL,我们关注核心的暴露逻辑

 
   
   
 
  1. //1

  2. URL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + path, map);

  3.        if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)

  4.                .hasExtension(url.getProtocol())) {

  5.            url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)

  6.                    .getExtension(url.getProtocol()).getConfigurator(url).configure(url);

  7.        }

  8. //2

  9.        String scope = url.getParameter(Constants.SCOPE_KEY);

  10.        //配置为none不暴露

  11.        if (! Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {

  12.            //配置不是remote的情况下做本地暴露 (配置为remote,则表示只暴露远程服务)

  13. //3

  14.            if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {

  15.                exportLocal(url);

  16.            }

  17.            //如果配置不是local则暴露为远程服务.(配置为local,则表示只暴露远程服务)

  18. //4

  19.            if (! Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope) ){

  20.                if (logger.isInfoEnabled()) {

  21.                    logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);

  22.                }

  23. //5

  24.                if (registryURLs != null && registryURLs.size() > 0

  25.                        && url.getParameter("register", true)) {

  26.                    for (URL registryURL : registryURLs) {

  27.                        url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic"));

  28.                        URL monitorUrl = loadMonitor(registryURL);

  29.                        if (monitorUrl != null) {

  30.                            url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());

  31.                        }

  32.                        if (logger.isInfoEnabled()) {

  33.                            logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);

  34.                        }

  35.                        Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));

  36.                        Exporter<?> exporter = protocol.export(invoker);

  37.                        exporters.add(exporter);

  38.                    }

  39.                } else {

  40.                    Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);

  41.                    Exporter<?> exporter = protocol.export(invoker);

  42.                    exporters.add(exporter);

  43.                }

  44.            }

  45.        }

  46.        this.urls.add(url);

注意上面代码1处,就是使用之前提取的参数生成本地暴露url的逻辑 而代码2处的scope也还是比较重要的,它控制了服务应该怎么暴露,我们项目中一般对service不进行scope配置,那么取到的值为null,代码3和代码4的条件都会满足,既会进行本地协议的本地暴露,也会进行远程暴露 而代码5,我们可以配置register="false",直接进行远程协议的本地暴露,不记录到注册中心上去,但是我们还是可以通过在消费者强制配置url来调用

 
   
   
 
  1.    <dubbo:service interface="com.alibaba.dubbo.demo.bid.BidService" ref="bidService"  protocol="dubbo" register="false"/>

服务暴露的逻辑其实是同一套

 
   
   
 
  1. Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);

  2. Exporter<?> exporter = protocol.export(invoker);

主要的区别点还是在于Url的不同,因为url带了不同的protocol以及其他配置,然后具体暴露时,使用之前讲的SPI来调用不同实现 比如在exportLocal方法里,其实会把url的protocol修改为injvm

 
   
   
 
  1. private void exportLocal(URL url) {

  2.        if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {

  3. //修改protocol为Injvm

  4.            URL local = URL.valueOf(url.toFullString())

  5.                    .setProtocol(Constants.LOCAL_PROTOCOL)

  6.                    .setHost(NetUtils.LOCALHOST)

  7.                    .setPort(0);

  8.            // modified by lishen

  9.            ServiceClassHolder.getInstance().pushServiceClass(getServiceClass(ref));

  10.            Exporter<?> exporter = protocol.export(

  11.                    proxyFactory.getInvoker(ref, (Class) interfaceClass, local));

  12.            exporters.add(exporter);

  13.            logger.info("Export dubbo service " + interfaceClass.getName() +" to local registry");

  14.        }

  15.    }

在存在注册中心,并且服务的Registry属性不为false的情况下会进行远程暴露,会在注册中心url的export参数带上原先的本地暴露url进行远程暴露,因此暴露使用的protocol也相应变为RegistryProtocol

 
   
   
 
  1. Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));

下面讲解具体暴露的逻辑

ref转换为Invoker

在进行暴露之前,我们需要将spring容器内的接口实现ref转换为invoker,通过proxyFactory.getInvoker(ref, (Class) interfaceClass, local)方法

proxyFactory是一个扩展点,有javaassist和jdk动态代理两种实现,默认实现为javaassist,并且提供一个包装类StubProxyFactoryWrapper用于提供降级服务(以后单独讲解)

 
   
   
 
  1. public class JavassistProxyFactory extends AbstractProxyFactory {

  2.    @SuppressWarnings("unchecked")

  3.    public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {

  4.        return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));

  5.    }

  6.    public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {

  7.        // TODO Wrapper类不能正确处理带$的类名

  8.        final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);

  9.        return new AbstractProxyInvoker<T>(proxy, type, url) {

  10.            @Override

  11.            protected Object doInvoke(T proxy, String methodName,

  12.                                      Class<?>[] parameterTypes,

  13.                                      Object[] arguments) throws Throwable {

  14.                return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);

  15.            }

  16.        };

  17.    }

  18. }

JavassistProxyFactory 中的Wapper类是动态生成的,可以针对接口的每个方法生成直接调用的代码,避免了反射,因为做了缓存,多次调用的情况下,会加快效率,而jdk实现用的反射效率应该差多了

 
   
   
 
  1. //JdkProxyFactory的AbstractProxyInvoker实现

  2. return new AbstractProxyInvoker<T>(proxy, type, url) {

  3.            @Override

  4.            protected Object doInvoke(T proxy, String methodName,

  5.                                      Class<?>[] parameterTypes,

  6.                                      Object[] arguments) throws Throwable {

  7.                Method method = proxy.getClass().getMethod(methodName, parameterTypes);

  8.                return method.invoke(proxy, arguments);

  9.            }

  10.        };

通过proxyFactory我们会得到一个父类为AbstractProxyInvoker的匿名Invoker类,内部通过反射或者动态生成字节码来调用目标ref的方法

通过protocol暴露

在得到Invoker后,我们通过protocol去进行服务暴露,暴露成功后得到Exporter引用

 
   
   
 
  1. <T> Exporter<T> export(Invoker<T> invoker) throws RpcException;

通过Protocol的export方法,需要将invoker转换为exporter,为什么? 因为Invoker只负责对具体方法的调用,但是方法的调用可以暴露到多个Protocol,所以需要有具体的Exporter来对应,比如Dubbo暴露得到DubboExporter,injvm暴露得到InjvmExporter

下面讲解每种protocol的暴露

通过InjvmProtocol暴露

InjvmProtocol是本地暴露中唯一使用本地协议的,意思就是说这个服务的url不能发布到注册中心,只能本地消费,在dubbo引用服务的逻辑中,如果发现本地InjvmProtocol中有所需要的Exproter,会优先选择引用本地 看下暴露的代码

 
   
   
 
  1. public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {

  2.        return new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(), exporterMap);

  3.    }

exporterMap为InjvmProtocol继承AbstractProtocol的一个参数,用来保存Exproter引用,同时exporterMap也会在InjvmExporter内被引用,主要用于卸载功能

 
   
   
 
  1. class InjvmExporter<T> extends AbstractExporter<T> {

  2.    private final String key;

  3.    private final Map<String, Exporter<?>> exporterMap;

  4.    InjvmExporter(Invoker<T> invoker, String key, Map<String, Exporter<?>> exporterMap){

  5.        super(invoker);

  6.        this.key = key;

  7.        this.exporterMap = exporterMap;

  8.        exporterMap.put(key, this);

  9.    }

  10.    public void unexport() {

  11.        super.unexport();

  12.        exporterMap.remove(key);

  13.    }

  14. }

关于这个exporterMap,由于每个Protocol实现都继承了AbstractProtocol,所以都会有exporterMap属性,并且每种Protocol在容器内只存在一个,我们可以在每个Protocol对象的exporterMap中拿到这个Protocol暴露的所有Exproter

通过RegistryProtocol暴露

这是远程暴露,在进行本地暴露的同时将本地暴露的url注册到注册中心同时也注册事件监听

 
   
   
 
  1. public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {

  2.        //export invoker

  3. //1

  4.        final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);

  5.        //registry provider

  6.        final Registry registry = getRegistry(originInvoker);

  7.        final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);

  8.        registry.register(registedProviderUrl);

  9.        // 订阅override数据

  10.        // FIXME 提供者订阅时,会影响同一JVM即暴露服务,又引用同一服务的的场景,因为subscribed以服务名为缓存的key,导致订阅信息覆盖。

  11.        final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);

  12.        final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl);

  13.        overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);

  14. //注册监听事件,用于url被修改时回调,进行exporter重新暴露

  15.        registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);

  16.        //保证每次export都返回一个新的exporter实例

  17.        return new Exporter<T>() {

  18.            public Invoker<T> getInvoker() {

  19.                return exporter.getInvoker();

  20.            }

  21.            public void unexport() {

  22.                try {

  23.                    exporter.unexport();

  24.                } catch (Throwable t) {

  25.                    logger.warn(t.getMessage(), t);

  26.                }

  27.                try {

  28.                    registry.unregister(registedProviderUrl);

  29.                } catch (Throwable t) {

  30.                    logger.warn(t.getMessage(), t);

  31.                }

  32.                try {

  33.                    overrideListeners.remove(overrideSubscribeUrl);

  34.                    registry.unsubscribe(overrideSubscribeUrl, overrideSubscribeListener);

  35.                } catch (Throwable t) {

  36.                    logger.warn(t.getMessage(), t);

  37.                }

  38.            }

  39.        };

  40.    }

注意代码1处,有一个本地暴露,看下代码

 
   
   
 
  1. private <T> ExporterChangeableWrapper<T>  doLocalExport(final Invoker<T> originInvoker){

  2.        String key = getCacheKey(originInvoker);

  3.        ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);

  4.        if (exporter == null) {

  5.            synchronized (bounds) {

  6.                exporter = (ExporterChangeableWrapper<T>) bounds.get(key);

  7.                if (exporter == null) {

  8.                    final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));

  9.                    exporter = new ExporterChangeableWrapper<T>((Exporter<T>)protocol.export(invokerDelegete), originInvoker);

  10.                    bounds.put(key, exporter);

  11.                }

  12.            }

  13.        }

  14.        return (ExporterChangeableWrapper<T>) exporter;

  15.    }

getProviderUrl用于从远程暴露url中的export参数中获取本地暴露的url

 
   
   
 
  1. private URL getProviderUrl(final Invoker<?> origininvoker){

  2.        String export = origininvoker.getUrl().getParameterAndDecoded(Constants.EXPORT_KEY);

  3.        if (export == null || export.length() == 0) {

  4.            throw new IllegalArgumentException("The registry export url is null! registry: " + origininvoker.getUrl());

  5.        }

  6.        URL providerUrl = URL.valueOf(export);

  7.        return providerUrl;

  8.    }

export参数对应的url才是需要实际本地暴露的,而作为export方法的远程暴露url只是为了注册提供者url到注册中心以及增加事件监听

同时注意一下bounds参数的校验是为了防止同一个invoker重复暴露,而ExporterChangeableWrapper封装是为了zookeeper中url发生改变时能修改Exporter

在完成本地暴露之后,会通过远程暴露url获取注册中心对象,然后把本地暴露url注册上去,同时也会给zookeeper中本地暴露url对应路径注册监听器,用于监听zookeeper上面的暴露url发生变化的时候,重新export(比如我们的控制台可以对参数进行调整)

最后把export返回

通过DubboProtocol暴露

DubboProtocol是使用远程协议的本地暴露,所以可以将暴露url注册到注册中心 看下它的export方法

 
   
   
 
  1. public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {

  2.        URL url = invoker.getUrl();

  3.        // export service.

  4.        String key = serviceKey(url);

  5.        DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);

  6.        exporterMap.put(key, exporter);

  7.        //export an stub service for dispaching event

  8.        Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY,Constants.DEFAULT_STUB_EVENT);

  9.        Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);

  10.        if (isStubSupportEvent && !isCallbackservice){

  11.            String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);

  12.            if (stubServiceMethods == null || stubServiceMethods.length() == 0 ){

  13.                if (logger.isWarnEnabled()){

  14.                    logger.warn(new IllegalStateException("consumer [" +url.getParameter(Constants.INTERFACE_KEY) +

  15.                            "], has set stubproxy support event ,but no stub methods founded."));

  16.                }

  17.            } else {

  18.                stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);

  19.            }

  20.        }

  21.        openServer(url);

  22.        // modified by lishen

  23.        optimizeSerialization(url);

  24.        return exporter;

  25.    }

首先会把invoker转换为DubboExporter,放到exporterMap中 然后有一些stub的逻辑,这个以后单独再讲 接下来就是打开netty服务,用于监听服务引用者的请求,打开服务器逻辑在openServer中

 
   
   
 
  1. private void openServer(URL url) {

  2.        // find server.

  3.        String key = url.getAddress();

  4.        //client 也可以暴露一个只有server可以调用的服务。

  5.        boolean isServer = url.getParameter(Constants.IS_SERVER_KEY,true);

  6.        if (isServer) {

  7.            ExchangeServer server = serverMap.get(key);

  8.            if (server == null) {

  9.                serverMap.put(key, createServer(url));

  10.            } else {

  11.                //server支持reset,配合override功能使用

  12.                server.reset(url);

  13.            }

  14.        }

  15.    }

从String key = url.getAddress();以及ExchangeServer server = serverMap.get(key);可以看出来在一个应用中,netty服务器针对每种协议只会起一个,因为每种协议只能配置一个端口 而reset方法,会使用之后暴露url的参数,覆盖已经开启netty服务内的参数 那么我们的服务器是怎么处理接收的rpc请求并调用对应exporter调用呢,进入createServer方法我们可以看到会netty服务的开启时会绑定一个requestHandler

 
   
   
 
  1. private ExchangeServer createServer(URL url) {

  2.        //默认开启server关闭时发送readonly事件

  3.        url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());

  4.        //默认开启heartbeat

  5.        url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));

  6.        String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);

  7.        if (str != null && str.length() > 0 && ! ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))

  8.            throw new RpcException("Unsupported server type: " + str + ", url: " + url);

  9.        url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME);

  10.        ExchangeServer server;

  11.        try {

  12. //1 绑定requestHandler

  13.            server = Exchangers.bind(url, requestHandler);

  14.        } catch (RemotingException e) {

  15.            throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);

  16.        }

  17.        str = url.getParameter(Constants.CLIENT_KEY);

  18.        if (str != null && str.length() > 0) {

  19.            Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();

  20.            if (!supportedTypes.contains(str)) {

  21.                throw new RpcException("Unsupported client type: " + str);

  22.            }

  23.        }

  24.        return server;

  25.    }

这个requestHandler就是用来处理接收到的rpc调用请求的,看下它内部的逻辑

 
   
   
 
  1. private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {

  2.        public Object reply(ExchangeChannel channel, Object message) throws RemotingException {

  3.            if (message instanceof Invocation) {

  4.                Invocation inv = (Invocation) message;

  5.                //通过inv获取对应invoker

  6.                Invoker<?> invoker = getInvoker(channel, inv);

  7.                //如果是callback 需要处理高版本调用低版本的问题

  8.                if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))){

  9.                    String methodsStr = invoker.getUrl().getParameters().get("methods");

  10.                    boolean hasMethod = false;

  11.                    if (methodsStr == null || methodsStr.indexOf(",") == -1){

  12.                        hasMethod = inv.getMethodName().equals(methodsStr);

  13.                    } else {

  14.                        String[] methods = methodsStr.split(",");

  15.                        for (String method : methods){

  16.                            if (inv.getMethodName().equals(method)){

  17.                                hasMethod = true;

  18.                                break;

  19.                            }

  20.                        }

  21.                    }

  22.                    if (!hasMethod){

  23.                        logger.warn(new IllegalStateException("The methodName "+inv.getMethodName()+" not found in callback service interface ,invoke will be ignored. please update the api interface. url is:" + invoker.getUrl()) +" ,invocation is :"+inv );

  24.                        return null;

  25.                    }

  26.                }

  27.                RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());

  28. //用invoker执行调用,返回结果

  29.                return invoker.invoke(inv);

  30.            }

  31.            throw new RemotingException(channel, "Unsupported request: " + message == null ? null : (message.getClass().getName() + ": " + message) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());

  32.        }

  33.        @Override

  34.        public void received(Channel channel, Object message) throws RemotingException {

  35.            if (message instanceof Invocation) {

  36.                reply((ExchangeChannel) channel, message);

  37.            } else {

  38.                super.received(channel, message);

  39.            }

  40.        }

  41.        @Override

  42.        public void connected(Channel channel) throws RemotingException {

  43.            invoke(channel, Constants.ON_CONNECT_KEY);

  44.        }

  45.        @Override

  46.        public void disconnected(Channel channel) throws RemotingException {

  47.            if(logger.isInfoEnabled()){

  48.                logger.info("disconected from "+ channel.getRemoteAddress() + ",url:" + channel.getUrl());

  49.            }

  50.            invoke(channel, Constants.ON_DISCONNECT_KEY);

  51.        }

  52.        private void invoke(Channel channel, String methodKey) {

  53.            Invocation invocation = createInvocation(channel, channel.getUrl(), methodKey);

  54.            if (invocation != null) {

  55.                try {

  56.                    received(channel, invocation);

  57.                } catch (Throwable t) {

  58.                    logger.warn("Failed to invoke event method " + invocation.getMethodName() + "(), cause: " + t.getMessage(), t);

  59.                }

  60.            }

  61.        }

  62.        private Invocation createInvocation(Channel channel, URL url, String methodKey) {

  63.            String method = url.getParameter(methodKey);

  64.            if (method == null || method.length() == 0) {

  65.                return null;

  66.            }

  67.            RpcInvocation invocation = new RpcInvocation(method, new Class<?>[0], new Object[0]);

  68.            invocation.setAttachment(Constants.PATH_KEY, url.getPath());

  69.            invocation.setAttachment(Constants.GROUP_KEY, url.getParameter(Constants.GROUP_KEY));

  70.            invocation.setAttachment(Constants.INTERFACE_KEY, url.getParameter(Constants.INTERFACE_KEY));

  71.            invocation.setAttachment(Constants.VERSION_KEY, url.getParameter(Constants.VERSION_KEY));

  72.            if (url.getParameter(Constants.STUB_EVENT_KEY, false)){

  73.                invocation.setAttachment(Constants.STUB_EVENT_KEY, Boolean.TRUE.toString());

  74.            }

  75.            return invocation;

  76.        }

  77.    };

这边主要讲下replay方法,这个方法用来处理用户的rpc请求,请求的序列化dubbo封装的netty服务已经处理,所在在这个方法传入的message直接就是Invocation对象,在getInvoker中,通过invocation对象可以我们可以生成exporterMap的key,用来拿到对应的Exporter

 
   
   
 
  1. Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException{

  2.        boolean isCallBackServiceInvoke = false;

  3.        boolean isStubServiceInvoke = false;

  4.        int port = channel.getLocalAddress().getPort();

  5.        String path = inv.getAttachments().get(Constants.PATH_KEY);

  6.        //如果是客户端的回调服务.

  7.        isStubServiceInvoke = Boolean.TRUE.toString().equals(inv.getAttachments().get(Constants.STUB_EVENT_KEY));

  8.        if (isStubServiceInvoke){

  9.            port = channel.getRemoteAddress().getPort();

  10.        }

  11.        //callback

  12.        isCallBackServiceInvoke = isClientSide(channel) && !isStubServiceInvoke;

  13.        if(isCallBackServiceInvoke){

  14.            path = inv.getAttachments().get(Constants.PATH_KEY)+"."+inv.getAttachments().get(Constants.CALLBACK_SERVICE_KEY);

  15.            inv.getAttachments().put(IS_CALLBACK_SERVICE_INVOKE, Boolean.TRUE.toString());

  16.        }

  17.        String serviceKey = serviceKey(port, path, inv.getAttachments().get(Constants.VERSION_KEY), inv.getAttachments().get(Constants.GROUP_KEY));

  18. //2 通过invocation生成的key获取exporter

  19.        DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);

  20.        if (exporter == null)

  21.            throw new RemotingException(channel, "Not found exported service: " + serviceKey + " in " + exporterMap.keySet() + ", may be version or group mismatch " + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress() + ", message:" + inv);

  22. //转换为invoker

  23.        return exporter.getInvoker();

  24.    }

拿到Exporter之后,转换为Invoker,直接调用invoke方法返回Result,之后返回给调用者的序列化等逻辑dubbo封装的netty服务也帮我们处理了,我们不用关注 这章主要讲解的是服务暴露,关于netty服务的实现不多分析(我也还没怎么看过),理解这个requestHandler处理器即可,知道它会怎么处理rpc请求对应的Invocation即可,什么序列化,加密解密全都当作黑盒。

服务引用,下章见


以上是关于Dubbo之服务暴露源码分析的主要内容,如果未能解决你的问题,请参考以下文章

Dubbo原理何源码解析之服务暴露

Dubbo之服务暴露源码分析

Dubbo源码阅读系列服务暴露之远程暴露

3.dubbo源码分析之API配置二(服务提供者)

dubbo源码分析11——服务暴露2_doExport()方法分析

dubbo源码分析8——服务暴露概述