dubbo服务端启动源码分析(基于Dubbo 3)

Posted Leo Han

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了dubbo服务端启动源码分析(基于Dubbo 3)相关的知识,希望对你有一定的参考价值。

本文基于Dubbo 3.0.2来进行源码研究。
在基于springboot的开发中,dubbot的spring-boot-starter中,DubboAutoConfiguration自动配置是一个关键类,这个类向spring容器注入了如下关键类:

  • ServiceAnnotationPostProcessor
  • DubboBootstrapApplicationListener

其中ServiceAnnotationPostProcessor主要是用来加载被DubbotService注解修饰的类,而DubboBootstrapApplicationListener则会监听spring的相关事件,其中:

public void onApplicationEvent(ApplicationEvent event) {
        if (isOriginalEventSource(event)) {
            if (event instanceof DubboAnnotationInitedEvent) {
                applicationContext.getBean(DubboConfigBeanInitializer.BEAN_NAME, DubboConfigBeanInitializer.class);
                DubboBootstrap.getInstance().initialize();
            } else if (event instanceof ApplicationContextEvent) {
                this.onApplicationContextEvent((ApplicationContextEvent) event);
            }
        }
    }
    private void onApplicationContextEvent(ApplicationContextEvent event) {
        if (DubboBootstrapStartStopListenerSpringAdapter.applicationContext == null) {
            DubboBootstrapStartStopListenerSpringAdapter.applicationContext = event.getApplicationContext();
        }
        if (event instanceof ContextRefreshedEvent) {
            onContextRefreshedEvent((ContextRefreshedEvent) event);
        } else if (event instanceof ContextClosedEvent) {
            onContextClosedEvent((ContextClosedEvent) event);
        }
    }

    private void onContextRefreshedEvent(ContextRefreshedEvent event) {
        if (dubboBootstrap.getTakeoverMode() == BootstrapTakeoverMode.SPRING) {
            dubboBootstrap.start();
        }
    }

    private void onContextClosedEvent(ContextClosedEvent event) {
        if (dubboBootstrap.getTakeoverMode() == BootstrapTakeoverMode.SPRING) {
            // will call dubboBootstrap.stop() through shutdown callback.
            DubboShutdownHook.getDubboShutdownHook().run();
        }
    }

其中,当spring容器就绪后会发布ContextRefreshedEvent,这时候,会调用DubboBootstrap来将服务发布出来,这是一个总的比较概括的步骤,接下来我们看细节。

首先看ServiceAnnotationPostProcessor,这个类的主要作用是将被如下注解修饰的类注入到了Spring容器中,同时生成对应的ServiceBean注入到Spring容器中,在ServiceBean中持有一个ref对应实际DubboService修饰的类的引用,而后面的服务暴露都是通过ServiceBean来实现,具体执行服务的方法,则是通过ServiceBean的ref对应具体的服务的方法去执行。

 private final static List<Class<? extends Annotation>> serviceAnnotationTypes = asList(
            DubboService.class,
            Service.class,
            com.alibaba.dubbo.config.annotation.Service.class
    );

而在ServiceBean则是对服务接口的封装:

public class ServiceBean<T> extends ServiceConfig<T> implements InitializingBean, DisposableBean,
        ApplicationContextAware, BeanNameAware, ApplicationEventPublisherAware {
   public void afterPropertiesSet() throws Exception {
        if (StringUtils.isEmpty(getPath())) {
            if (StringUtils.isNotEmpty(getInterface())) {
                setPath(getInterface());
            }
        }
        //register service bean and set bootstrap
        DubboBootstrap.getInstance().service(this);
    }        
    
}

这里会将当前ServiceBean注册到DubboBootstrap,然后上面的在ContextRefreshedEvent事件时,会进行

 dubboBootstrap.start();

进行服务的发布:

public DubboBootstrap start() {
        if (started.compareAndSet(false, true)) {
            startup.set(false);
            shutdown.set(false);
            awaited.set(false);

            initialize();
            if (logger.isInfoEnabled()) {
                logger.info(NAME + " is starting...");
            }
            // 1. export Dubbo Services
            exportServices();

            // If register consumer instance or has exported services
            if (isRegisterConsumerInstance() || hasExportedServices()) {
                // 2. export MetadataService
                exportMetadataService();
                // 3. Register the local ServiceInstance if required
                registerServiceInstance();
            }

            referServices();

            // wait async export / refer finish if needed
            awaitFinish();

            if (isExportBackground() || isReferBackground()) {
                new Thread(() -> {
                    while (!asyncExportFinish || !asyncReferFinish) {
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                            logger.error(NAME + " waiting async export / refer occurred and error.", e);
                        }
                    }

                    startup.set(true);
                    if (logger.isInfoEnabled()) {
                        logger.info(NAME + " is ready.");
                    }
                    onStart();
                }).start();
            } else {
                startup.set(true);
                if (logger.isInfoEnabled()) {
                    logger.info(NAME + " is ready.");
                }
                onStart();
            }

            if (logger.isInfoEnabled()) {
                logger.info(NAME + " has started.");
            }
        }
        return this;
    }
    private void exportServices() {
        for (ServiceConfigBase sc : configManager.getServices()) {
            // TODO, compatible with ServiceConfig.export()
            ServiceConfig<?> serviceConfig = (ServiceConfig<?>) sc;
            serviceConfig.setBootstrap(this);
            if (!serviceConfig.isRefreshed()) {
                serviceConfig.refresh();
            }

            if (sc.shouldExportAsync()) {
                ExecutorService executor = executorRepository.getServiceExportExecutor();
                CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
                    try {
                        if (!sc.isExported()) {
                            sc.export();
                            exportedServices.add(sc);
                        }
                    } catch (Throwable t) {
                        logger.error("export async catch error : " + t.getMessage(), t);
                    }
                }, executor);

                asyncExportingFutures.add(future);
            } else {
                if (!sc.isExported()) {
                    sc.export();
                    exportedServices.add(sc);
                }
            }
        }
    }

可以看到,这里的exportServices实际上最终会调用ServiceConfig.export:

// ServiceConfig.java
public synchronized void export() {
        if (this.shouldExport() && !this.exported) {
            this.init();

            // check bootstrap state
            if (!bootstrap.isInitialized()) {
                throw new IllegalStateException("DubboBootstrap is not initialized");
            }

            if (!this.isRefreshed()) {
                this.refresh();
            }

            if (!shouldExport()) {
                return;
            }

            if (shouldDelay()) {
                DELAY_EXPORT_EXECUTOR.schedule(this::doExport, getDelay(), TimeUnit.MILLISECONDS);
            } else {
                doExport();
            }

            if (this.bootstrap.getTakeoverMode() == BootstrapTakeoverMode.AUTO) {
                this.bootstrap.start();
            }
        }
    }

可以看到在ServiceConfig中进行export的时候主要如下几步:

  • init
  • refresh
  • doExport
  1. 首先是init
public void init() {
        if (this.initialized.compareAndSet(false, true)) {
            if (this.bootstrap == null) {
                this.bootstrap = DubboBootstrap.getInstance();
                this.bootstrap.initialize();
            }
            this.bootstrap.service(this);
            ExtensionLoader<ServiceListener> extensionLoader = ExtensionLoader.getExtensionLoader(ServiceListener.class);
            this.serviceListeners.addAll(extensionLoader.getSupportedExtensionInstances());

            this.checkAndUpdateSubConfigs();
        }

        initServiceMetadata(provider);
        serviceMetadata.setServiceType(getInterfaceClass());
        serviceMetadata.setTarget(getRef());
        serviceMetadata.generateServiceKey();
    }

在init的时候,首先会加载ServiceListener,相当于是监听器,init的时候主要是加载相关的配置和转化
接下来是refresh:

public void refresh() {
        refreshed.set(true);
        try {
            // check and init before do refresh
            preProcessRefresh();

            Environment environment = ApplicationModel.getEnvironment();
            List<Map<String, String>> configurationMaps = environment.getConfigurationMaps();

            // Search props starts with PREFIX in order
            String preferredPrefix = null;
            for (String prefix : getPrefixes()) {
                if (ConfigurationUtils.hasSubProperties(configurationMaps, prefix)) {
                    preferredPrefix = prefix;
                    break;
                }
            }
            if (preferredPrefix == null) {
                preferredPrefix = getPrefixes().get(0);
            }
            // Extract sub props (which key was starts with preferredPrefix)
            Collection<Map<String, String>> instanceConfigMaps = environment.getConfigurationMaps(this, preferredPrefix);
            Map<String, String> subProperties = ConfigurationUtils.getSubProperties(instanceConfigMaps, preferredPrefix);
            InmemoryConfiguration subPropsConfiguration = new InmemoryConfiguration(subProperties);

            if (logger.isDebugEnabled()) {
                String idOrName = "";
                if (StringUtils.hasText(this.getId())) {
                    idOrName = "[id=" + this.getId() + "]";
                } else {
                    String name = ReflectUtils.getProperty(this, "getName");
                    if (StringUtils.hasText(name)) {
                        idOrName = "[name=" + name + "]";
                    }
                }
            }
            Method[] methods = getClass().getMethods();
            for (Method method : methods) {
                if (MethodUtils.isSetter(method)) {
                    String propertyName = extractPropertyName(method.getName());
                
                    String kebabPropertyName = StringUtils.convertToSplitName(propertyName, "-");

                    try {
                        String value = StringUtils.trim(subPropsConfiguration.getString(kebabPropertyName));

                        if (StringUtils.hasText(value) && ClassUtils.isTypeMatch(method.getParameterTypes()[0], value) &&
                                !isIgnoredAttribute(getClass(), propertyName)) {
                            value = environment.resolvePlaceholders(value);
                            method.invoke(this, ClassUtils.convertPrimitive(method.getParameterTypes()[0], value));
                        }
                    } catch (Exception e) {
            
                    }
                } else if (isParametersSetter(method)) {
                    String propertyName = extractPropertyName(method.getName());
                    String value = StringUtils.trim(subPropsConfiguration.getString(propertyName));
                    Map<String, String> parameterMap = null;
                    if (StringUtils.hasText(value)) {
                        parameterMap = StringUtils.parseParameters(value);
                    } else {
                        parameterMap = ConfigurationUtils.getSubProperties(subProperties, PARAMETERS);
                    }
                    invokeSetParameters(convert(parameterMap, ""));
                }
            }

            processExtraRefresh(preferredPrefix, subPropsConfiguration);

        } catch (Exception e) {
            throw new IllegalStateException("Failed to override field value of config bean: "+this, e);
        }
        postProcessRefresh();
    }

refresh主要是把配置读取并设置到当前ServiceConfig的相关属性赋值到对应的配置上。

最终服务暴露发布是在doExport中:

以上是关于dubbo服务端启动源码分析(基于Dubbo 3)的主要内容,如果未能解决你的问题,请参考以下文章

Dubbo3中服务端线程模型,线程处理(基于Dubbo3)

Dubbo消费端启动流程处理逻辑,方法调用实现(基于Dubbo3)

Dubbo中服务注册与发现实现原理

Dubbo3高级特性「系统级别检查」服务端和消费端启动时检查

阿里面试官:你读过Dubbo的源码,给我说说服务之间是怎么进行消费的?

Dubbo服务端消费端网络连接数控制