dubbo服务端启动源码分析(基于Dubbo 3)
Posted Leo Han
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了dubbo服务端启动源码分析(基于Dubbo 3)相关的知识,希望对你有一定的参考价值。
本文基于Dubbo 3.0.2来进行源码研究。
在基于springboot的开发中,dubbot的spring-boot-starter中,DubboAutoConfiguration
自动配置是一个关键类,这个类向spring容器注入了如下关键类:
- ServiceAnnotationPostProcessor
- DubboBootstrapApplicationListener
其中ServiceAnnotationPostProcessor
主要是用来加载被DubbotService
注解修饰的类,而DubboBootstrapApplicationListener
则会监听spring的相关事件,其中:
public void onApplicationEvent(ApplicationEvent event) {
if (isOriginalEventSource(event)) {
if (event instanceof DubboAnnotationInitedEvent) {
applicationContext.getBean(DubboConfigBeanInitializer.BEAN_NAME, DubboConfigBeanInitializer.class);
DubboBootstrap.getInstance().initialize();
} else if (event instanceof ApplicationContextEvent) {
this.onApplicationContextEvent((ApplicationContextEvent) event);
}
}
}
private void onApplicationContextEvent(ApplicationContextEvent event) {
if (DubboBootstrapStartStopListenerSpringAdapter.applicationContext == null) {
DubboBootstrapStartStopListenerSpringAdapter.applicationContext = event.getApplicationContext();
}
if (event instanceof ContextRefreshedEvent) {
onContextRefreshedEvent((ContextRefreshedEvent) event);
} else if (event instanceof ContextClosedEvent) {
onContextClosedEvent((ContextClosedEvent) event);
}
}
private void onContextRefreshedEvent(ContextRefreshedEvent event) {
if (dubboBootstrap.getTakeoverMode() == BootstrapTakeoverMode.SPRING) {
dubboBootstrap.start();
}
}
private void onContextClosedEvent(ContextClosedEvent event) {
if (dubboBootstrap.getTakeoverMode() == BootstrapTakeoverMode.SPRING) {
// will call dubboBootstrap.stop() through shutdown callback.
DubboShutdownHook.getDubboShutdownHook().run();
}
}
其中,当spring容器就绪后会发布ContextRefreshedEvent
,这时候,会调用DubboBootstrap
来将服务发布出来,这是一个总的比较概括的步骤,接下来我们看细节。
首先看ServiceAnnotationPostProcessor
,这个类的主要作用是将被如下注解修饰的类注入到了Spring容器中,同时生成对应的ServiceBean
注入到Spring容器中,在ServiceBean中持有一个ref
对应实际DubboService
修饰的类的引用,而后面的服务暴露都是通过ServiceBean来实现,具体执行服务的方法,则是通过ServiceBean的ref
对应具体的服务的方法去执行。
private final static List<Class<? extends Annotation>> serviceAnnotationTypes = asList(
DubboService.class,
Service.class,
com.alibaba.dubbo.config.annotation.Service.class
);
而在ServiceBean
则是对服务接口的封装:
public class ServiceBean<T> extends ServiceConfig<T> implements InitializingBean, DisposableBean,
ApplicationContextAware, BeanNameAware, ApplicationEventPublisherAware {
public void afterPropertiesSet() throws Exception {
if (StringUtils.isEmpty(getPath())) {
if (StringUtils.isNotEmpty(getInterface())) {
setPath(getInterface());
}
}
//register service bean and set bootstrap
DubboBootstrap.getInstance().service(this);
}
}
这里会将当前ServiceBean
注册到DubboBootstrap
,然后上面的在ContextRefreshedEvent
事件时,会进行
dubboBootstrap.start();
进行服务的发布:
public DubboBootstrap start() {
if (started.compareAndSet(false, true)) {
startup.set(false);
shutdown.set(false);
awaited.set(false);
initialize();
if (logger.isInfoEnabled()) {
logger.info(NAME + " is starting...");
}
// 1. export Dubbo Services
exportServices();
// If register consumer instance or has exported services
if (isRegisterConsumerInstance() || hasExportedServices()) {
// 2. export MetadataService
exportMetadataService();
// 3. Register the local ServiceInstance if required
registerServiceInstance();
}
referServices();
// wait async export / refer finish if needed
awaitFinish();
if (isExportBackground() || isReferBackground()) {
new Thread(() -> {
while (!asyncExportFinish || !asyncReferFinish) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
logger.error(NAME + " waiting async export / refer occurred and error.", e);
}
}
startup.set(true);
if (logger.isInfoEnabled()) {
logger.info(NAME + " is ready.");
}
onStart();
}).start();
} else {
startup.set(true);
if (logger.isInfoEnabled()) {
logger.info(NAME + " is ready.");
}
onStart();
}
if (logger.isInfoEnabled()) {
logger.info(NAME + " has started.");
}
}
return this;
}
private void exportServices() {
for (ServiceConfigBase sc : configManager.getServices()) {
// TODO, compatible with ServiceConfig.export()
ServiceConfig<?> serviceConfig = (ServiceConfig<?>) sc;
serviceConfig.setBootstrap(this);
if (!serviceConfig.isRefreshed()) {
serviceConfig.refresh();
}
if (sc.shouldExportAsync()) {
ExecutorService executor = executorRepository.getServiceExportExecutor();
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
try {
if (!sc.isExported()) {
sc.export();
exportedServices.add(sc);
}
} catch (Throwable t) {
logger.error("export async catch error : " + t.getMessage(), t);
}
}, executor);
asyncExportingFutures.add(future);
} else {
if (!sc.isExported()) {
sc.export();
exportedServices.add(sc);
}
}
}
}
可以看到,这里的exportServices
实际上最终会调用ServiceConfig.export
:
// ServiceConfig.java
public synchronized void export() {
if (this.shouldExport() && !this.exported) {
this.init();
// check bootstrap state
if (!bootstrap.isInitialized()) {
throw new IllegalStateException("DubboBootstrap is not initialized");
}
if (!this.isRefreshed()) {
this.refresh();
}
if (!shouldExport()) {
return;
}
if (shouldDelay()) {
DELAY_EXPORT_EXECUTOR.schedule(this::doExport, getDelay(), TimeUnit.MILLISECONDS);
} else {
doExport();
}
if (this.bootstrap.getTakeoverMode() == BootstrapTakeoverMode.AUTO) {
this.bootstrap.start();
}
}
}
可以看到在ServiceConfig
中进行export
的时候主要如下几步:
- init
- refresh
- doExport
- 首先是
init
:
public void init() {
if (this.initialized.compareAndSet(false, true)) {
if (this.bootstrap == null) {
this.bootstrap = DubboBootstrap.getInstance();
this.bootstrap.initialize();
}
this.bootstrap.service(this);
ExtensionLoader<ServiceListener> extensionLoader = ExtensionLoader.getExtensionLoader(ServiceListener.class);
this.serviceListeners.addAll(extensionLoader.getSupportedExtensionInstances());
this.checkAndUpdateSubConfigs();
}
initServiceMetadata(provider);
serviceMetadata.setServiceType(getInterfaceClass());
serviceMetadata.setTarget(getRef());
serviceMetadata.generateServiceKey();
}
在init的时候,首先会加载ServiceListener
,相当于是监听器,init的时候主要是加载相关的配置和转化
接下来是refresh
:
public void refresh() {
refreshed.set(true);
try {
// check and init before do refresh
preProcessRefresh();
Environment environment = ApplicationModel.getEnvironment();
List<Map<String, String>> configurationMaps = environment.getConfigurationMaps();
// Search props starts with PREFIX in order
String preferredPrefix = null;
for (String prefix : getPrefixes()) {
if (ConfigurationUtils.hasSubProperties(configurationMaps, prefix)) {
preferredPrefix = prefix;
break;
}
}
if (preferredPrefix == null) {
preferredPrefix = getPrefixes().get(0);
}
// Extract sub props (which key was starts with preferredPrefix)
Collection<Map<String, String>> instanceConfigMaps = environment.getConfigurationMaps(this, preferredPrefix);
Map<String, String> subProperties = ConfigurationUtils.getSubProperties(instanceConfigMaps, preferredPrefix);
InmemoryConfiguration subPropsConfiguration = new InmemoryConfiguration(subProperties);
if (logger.isDebugEnabled()) {
String idOrName = "";
if (StringUtils.hasText(this.getId())) {
idOrName = "[id=" + this.getId() + "]";
} else {
String name = ReflectUtils.getProperty(this, "getName");
if (StringUtils.hasText(name)) {
idOrName = "[name=" + name + "]";
}
}
}
Method[] methods = getClass().getMethods();
for (Method method : methods) {
if (MethodUtils.isSetter(method)) {
String propertyName = extractPropertyName(method.getName());
String kebabPropertyName = StringUtils.convertToSplitName(propertyName, "-");
try {
String value = StringUtils.trim(subPropsConfiguration.getString(kebabPropertyName));
if (StringUtils.hasText(value) && ClassUtils.isTypeMatch(method.getParameterTypes()[0], value) &&
!isIgnoredAttribute(getClass(), propertyName)) {
value = environment.resolvePlaceholders(value);
method.invoke(this, ClassUtils.convertPrimitive(method.getParameterTypes()[0], value));
}
} catch (Exception e) {
}
} else if (isParametersSetter(method)) {
String propertyName = extractPropertyName(method.getName());
String value = StringUtils.trim(subPropsConfiguration.getString(propertyName));
Map<String, String> parameterMap = null;
if (StringUtils.hasText(value)) {
parameterMap = StringUtils.parseParameters(value);
} else {
parameterMap = ConfigurationUtils.getSubProperties(subProperties, PARAMETERS);
}
invokeSetParameters(convert(parameterMap, ""));
}
}
processExtraRefresh(preferredPrefix, subPropsConfiguration);
} catch (Exception e) {
throw new IllegalStateException("Failed to override field value of config bean: "+this, e);
}
postProcessRefresh();
}
refresh
主要是把配置读取并设置到当前ServiceConfig
的相关属性赋值到对应的配置上。
最终服务暴露发布是在doExport
中:
以上是关于dubbo服务端启动源码分析(基于Dubbo 3)的主要内容,如果未能解决你的问题,请参考以下文章
Dubbo消费端启动流程处理逻辑,方法调用实现(基于Dubbo3)
Dubbo3高级特性「系统级别检查」服务端和消费端启动时检查