Dubbo消费端启动流程处理逻辑,方法调用实现(基于Dubbo3)

Posted Leo Han

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Dubbo消费端启动流程处理逻辑,方法调用实现(基于Dubbo3)相关的知识,希望对你有一定的参考价值。

之前我们已经分析Dubbo服务端相关启动流程和处理逻辑,接下来我们分析下客户端的处理流程,本文主要分析的是基于Spring框架下启动流程分析。
首先当我们使用DubboComponentScan这个注解的时候,会注入DubboComponentScanRegistrar:

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(DubboComponentScanRegistrar.class)
public @interface DubboComponentScan {
......
}

而在DubboComponentScanRegistrar则会注入如下几个类:

static void registerCommonBeans(BeanDefinitionRegistry registry) {

        registerInfrastructureBean(registry, ServicePackagesHolder.BEAN_NAME, ServicePackagesHolder.class);

        registerInfrastructureBean(registry, ReferenceBeanManager.BEAN_NAME, ReferenceBeanManager.class);

        // Since 2.5.7 Register @Reference Annotation Bean Processor as an infrastructure Bean
        registerInfrastructureBean(registry, ReferenceAnnotationBeanPostProcessor.BEAN_NAME,
                ReferenceAnnotationBeanPostProcessor.class);

        // TODO Whether DubboConfigAliasPostProcessor can be removed ?
        // Since 2.7.4 [Feature] https://github.com/apache/dubbo/issues/5093
        registerInfrastructureBean(registry, DubboConfigAliasPostProcessor.BEAN_NAME,
                DubboConfigAliasPostProcessor.class);

        // Since 2.7.4 Register DubboBootstrapApplicationListener as an infrastructure Bean
        registerInfrastructureBean(registry, DubboBootstrapApplicationListener.BEAN_NAME,
                DubboBootstrapApplicationListener.class);

        // Since 2.7.6 Register DubboConfigDefaultPropertyValueBeanPostProcessor as an infrastructure Bean
        registerInfrastructureBean(registry, DubboConfigDefaultPropertyValueBeanPostProcessor.BEAN_NAME,
                DubboConfigDefaultPropertyValueBeanPostProcessor.class);

        // Dubbo config initializer
        registerInfrastructureBean(registry, DubboConfigBeanInitializer.BEAN_NAME, DubboConfigBeanInitializer.class);

        // register infra bean if not exists later
        registerInfrastructureBean(registry, DubboInfraBeanRegisterPostProcessor.BEAN_NAME, DubboInfraBeanRegisterPostProcessor.class);
    }

而和服务端类似,这里消费端需要注意的就是ReferenceAnnotationBeanPostProcessor,这个类会监测当前类中有没有被如下注解修饰的类:

 public ReferenceAnnotationBeanPostProcessor() {
        super(DubboReference.class, Reference.class, com.alibaba.dubbo.config.annotation.Reference.class);
    }

如果有,则会生成一个ReferenceBean注入到spring容器中去,后续消费端的调用都是基于ReferenceBean,我们看下这个类:

public class ReferenceBean<T> implements FactoryBean,
        ApplicationContextAware, BeanClassLoaderAware, BeanNameAware, InitializingBean, DisposableBean {
.......
}

其他一些逻辑就不分析了,我们在这里分析下怎么启动客户端并执行的。
ReferenceBean实现了FactoryBean接口,这样Spring容器构造其对象的实例的话则会通过FactoryBean来实现,在ReferenceBean对应:

public Object getObject() {
        if (lazyProxy == null) {
            createLazyProxy();
        }
        return lazyProxy;
    }
private void createLazyProxy() {

        //set proxy interfaces
        //see also: org.apache.dubbo.rpc.proxy.AbstractProxyFactory.getProxy(org.apache.dubbo.rpc.Invoker<T>, boolean)
        ProxyFactory proxyFactory = new ProxyFactory();
        proxyFactory.setTargetSource(new DubboReferenceLazyInitTargetSource());
        proxyFactory.addInterface(interfaceClass);
        Class<?>[] internalInterfaces = AbstractProxyFactory.getInternalInterfaces();
        for (Class<?> anInterface : internalInterfaces) {
            proxyFactory.addInterface(anInterface);
        }
        if (!StringUtils.isEquals(interfaceClass.getName(), interfaceName)) {
            try {
                Class<?> serviceInterface = ClassUtils.forName(interfaceName, beanClassLoader);
                proxyFactory.addInterface(serviceInterface);
            } catch (ClassNotFoundException e) {
            }
        }

        this.lazyProxy = proxyFactory.getProxy(this.beanClassLoader);
    }
private class DubboReferenceLazyInitTargetSource extends AbstractLazyCreationTargetSource {

        @Override
        protected Object createObject() throws Exception {
            return getCallProxy();
        }

        @Override
        public synchronized Class<?> getTargetClass() {
            return getInterfaceClass();
        }
    }
private Object getCallProxy() throws Exception {
        if (referenceConfig == null) {
            throw new IllegalStateException("ReferenceBean is not ready yet, please make sure to call reference interface method after dubbo is started.");
        }
        //get reference proxy
        return referenceConfig.get();
    }

最终通过referenceConfig.get()来获取接口的代理,是不是跟服务端很类似。

// ReferenceConfig.java
public synchronized T get() {
        if (destroyed) {
            throw new IllegalStateException("The invoker of ReferenceConfig(" + url + ") has already destroyed!");
        }

        if (ref == null) {
            init();
        }

        return ref;
    }

ReferenceConfig的get方法中,会调用init进行初始化,获取代理,而其中生成代理的关键代码如下:

ref = createProxy(map);

主要逻辑如下,这里我们选择了一种 情况说明:

invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0));
return (T) PROXY_FACTORY.getProxy(invoker, ProtocolUtils.isGeneric(generic));

看到这里是不是有点熟悉了,和server端一样,也是invoker,我们以默认的dubbo协议来看看:

public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException {
        optimizeSerialization(url);

        // create rpc invoker.
        DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
        invokers.add(invoker);

        return invoker;
    }

而这里在生成消费端的invoker的时候,传入了一个ExchangeClient:

private ExchangeClient[] getClients(URL url) {
        // whether to share connection

        boolean useShareConnect = false;

        int connections = url.getParameter(CONNECTIONS_KEY, 0);
        List<ReferenceCountExchangeClient> shareClients = null;
        // if not configured, connection is shared, otherwise, one connection for one service
        if (connections == 0) {
            useShareConnect = true;

            /*
             * The xml configuration should have a higher priority than properties.
             */
            String shareConnectionsStr = url.getParameter(SHARE_CONNECTIONS_KEY, (String) null);
            connections = Integer.parseInt(StringUtils.isBlank(shareConnectionsStr) ? ConfigUtils.getProperty(SHARE_CONNECTIONS_KEY,
                    DEFAULT_SHARE_CONNECTIONS) : shareConnectionsStr);
            shareClients = getSharedClient(url, connections);
        }

        ExchangeClient[] clients = new ExchangeClient[connections];
        for (int i = 0; i < clients.length; i++) {
            if (useShareConnect) {
                clients[i] = shareClients.get(i);

            } else {
                clients[i] = initClient(url);
            }
        }

        return clients;
    }
private ExchangeClient initClient(URL url) {

        // client type setting.
        String str = url.getParameter(CLIENT_KEY, url.getParameter(SERVER_KEY, DEFAULT_REMOTING_CLIENT));

        url = url.addParameter(CODEC_KEY, DubboCodec.NAME);
        // enable heartbeat by default
        url = url.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT));

        // BIO is not allowed since it has severe performance issue.
        if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
            throw new RpcException("Unsupported client type: " + str + "," +
                    " supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));
        }

        ExchangeClient client;
        try {
            // connection should be lazy
            if (url.getParameter(LAZY_CONNECT_KEY, false)) {
                client = new LazyConnectExchangeClient(url, requestHandler);

            } else {
                client = Exchangers.connect(url, requestHandler);
            }

        } catch (RemotingException e) {
            throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
        }

        return client;
    }

这里最后调用到HeaderExchanger中:

  public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
        return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
    }

Transporters中进行服务端连接:

public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        ChannelHandler handler;
        if (handlers == null || handlers.length == 0) {
            handler = new ChannelHandlerAdapter();
        } else if (handlers.length == 1) {
            handler = handlers[0];
        } else {
            handler = new ChannelHandlerDispatcher(handlers);
        }
        return getTransporter().connect(url, handler);
    }

而这里getTransporter默认获取到的是NettyTransporter

public Client connect(URL url, ChannelHandler handler) throws RemotingException {
        return new NettyClient(url, handler);
    }

可以看到,这里返回的默认是一个NettyClient

到这里,可以看到,这里生成了一个DubboInvoker,然后通过代理生成一个目标接口代理对象:

(T) PROXY_FACTORY.getProxy(invoker, ProtocolUtils.isGeneric(generic));
public <T> T getProxy(Invoker<T> invoker, boolean generic) throws RpcException {
        // when compiling with native image, ensure that the order of the interfaces remains unchanged
        LinkedHashSet<Class<?>> interfaces = new LinkedHashSet<>();

        String config = invoker.getUrl().getParameter(INTERFACES);
        if (config != null && config.length() > 0) {
            String[] types = COMMA_SPLIT_PATTERN.split(config);
            for (String type : types) {
                // TODO can we load successfully for a different classloader?.
                interfaces.add(ReflectUtils.forName(type));
            }
        }

        if (generic) {
            if (GenericService.class.equals(invoker.getInterface()) || !GenericService.class.isAssignableFrom(invoker.getInterface())) {
                interfaces.add(com.alibaba.dubbo.rpc.service.GenericService.class);
            }

            try {
                // find the real interface from url
                String realInterface = invoker.getUrl().getParameter(Constants.INTERFACE);
                interfaces.add(ReflectUtils.forName(realInterface));
            } catch (Throwable e) {
                // ignore
            }
        }

        interfaces.add(invoker.getInterface());
        interfaces.addAll(Arrays.asList(INTERNAL_INTERFACES));

        return getProxy(invoker, interfaces.toArray(new Class<?>[0]));
    }

最终getProxy有两个实现,通过jdk和Javassist生成两种,与服务端完全一致,我们看下Javassist方式的:

public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
        return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
    }

    @Override
    public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
        // TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
        final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
        return new AbstractProxyInvoker<T>(proxy, type, url) {
            @Override
            protected Object doInvoke(T proxy, String methodName,
                                      Class<?>[] parameterTypes,
                                      Object[] arguments) throws Throwable {
                return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
            }
        };
    }

这里就是标准的java的动态代理了,而Dubbo在这里实现了自己的InvocationHandler:

public class InvokerInvocationHandler implements InvocationHandler {
    private static final Logger logger = LoggerFactory.getLogger(InvokerInvocationHandler.Dubbo消费端同步调用异步调用(基于Dubbo3)

Dubbo服务端消费端网络连接数控制

Dubbo3高级特性「系统级别检查」服务端和消费端启动时检查

阿里面试官:你读过Dubbo的源码,给我说说服务之间是怎么进行消费的?

Dubbo3中服务端线程模型,线程处理(基于Dubbo3)

dubbo服务端启动源码分析(基于Dubbo 3)