Dubbo消费端启动流程处理逻辑,方法调用实现(基于Dubbo3)
Posted Leo Han
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Dubbo消费端启动流程处理逻辑,方法调用实现(基于Dubbo3)相关的知识,希望对你有一定的参考价值。
之前我们已经分析Dubbo服务端相关启动流程和处理逻辑,接下来我们分析下客户端的处理流程,本文主要分析的是基于Spring框架下启动流程分析。
首先当我们使用DubboComponentScan
这个注解的时候,会注入DubboComponentScanRegistrar
:
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(DubboComponentScanRegistrar.class)
public @interface DubboComponentScan {
......
}
而在DubboComponentScanRegistrar
则会注入如下几个类:
static void registerCommonBeans(BeanDefinitionRegistry registry) {
registerInfrastructureBean(registry, ServicePackagesHolder.BEAN_NAME, ServicePackagesHolder.class);
registerInfrastructureBean(registry, ReferenceBeanManager.BEAN_NAME, ReferenceBeanManager.class);
// Since 2.5.7 Register @Reference Annotation Bean Processor as an infrastructure Bean
registerInfrastructureBean(registry, ReferenceAnnotationBeanPostProcessor.BEAN_NAME,
ReferenceAnnotationBeanPostProcessor.class);
// TODO Whether DubboConfigAliasPostProcessor can be removed ?
// Since 2.7.4 [Feature] https://github.com/apache/dubbo/issues/5093
registerInfrastructureBean(registry, DubboConfigAliasPostProcessor.BEAN_NAME,
DubboConfigAliasPostProcessor.class);
// Since 2.7.4 Register DubboBootstrapApplicationListener as an infrastructure Bean
registerInfrastructureBean(registry, DubboBootstrapApplicationListener.BEAN_NAME,
DubboBootstrapApplicationListener.class);
// Since 2.7.6 Register DubboConfigDefaultPropertyValueBeanPostProcessor as an infrastructure Bean
registerInfrastructureBean(registry, DubboConfigDefaultPropertyValueBeanPostProcessor.BEAN_NAME,
DubboConfigDefaultPropertyValueBeanPostProcessor.class);
// Dubbo config initializer
registerInfrastructureBean(registry, DubboConfigBeanInitializer.BEAN_NAME, DubboConfigBeanInitializer.class);
// register infra bean if not exists later
registerInfrastructureBean(registry, DubboInfraBeanRegisterPostProcessor.BEAN_NAME, DubboInfraBeanRegisterPostProcessor.class);
}
而和服务端类似,这里消费端需要注意的就是ReferenceAnnotationBeanPostProcessor
,这个类会监测当前类中有没有被如下注解修饰的类:
public ReferenceAnnotationBeanPostProcessor() {
super(DubboReference.class, Reference.class, com.alibaba.dubbo.config.annotation.Reference.class);
}
如果有,则会生成一个ReferenceBean
注入到spring容器中去,后续消费端的调用都是基于ReferenceBean
,我们看下这个类:
public class ReferenceBean<T> implements FactoryBean,
ApplicationContextAware, BeanClassLoaderAware, BeanNameAware, InitializingBean, DisposableBean {
.......
}
其他一些逻辑就不分析了,我们在这里分析下怎么启动客户端并执行的。
ReferenceBean
实现了FactoryBean
接口,这样Spring容器构造其对象的实例的话则会通过FactoryBean
来实现,在ReferenceBean
对应:
public Object getObject() {
if (lazyProxy == null) {
createLazyProxy();
}
return lazyProxy;
}
private void createLazyProxy() {
//set proxy interfaces
//see also: org.apache.dubbo.rpc.proxy.AbstractProxyFactory.getProxy(org.apache.dubbo.rpc.Invoker<T>, boolean)
ProxyFactory proxyFactory = new ProxyFactory();
proxyFactory.setTargetSource(new DubboReferenceLazyInitTargetSource());
proxyFactory.addInterface(interfaceClass);
Class<?>[] internalInterfaces = AbstractProxyFactory.getInternalInterfaces();
for (Class<?> anInterface : internalInterfaces) {
proxyFactory.addInterface(anInterface);
}
if (!StringUtils.isEquals(interfaceClass.getName(), interfaceName)) {
try {
Class<?> serviceInterface = ClassUtils.forName(interfaceName, beanClassLoader);
proxyFactory.addInterface(serviceInterface);
} catch (ClassNotFoundException e) {
}
}
this.lazyProxy = proxyFactory.getProxy(this.beanClassLoader);
}
private class DubboReferenceLazyInitTargetSource extends AbstractLazyCreationTargetSource {
@Override
protected Object createObject() throws Exception {
return getCallProxy();
}
@Override
public synchronized Class<?> getTargetClass() {
return getInterfaceClass();
}
}
private Object getCallProxy() throws Exception {
if (referenceConfig == null) {
throw new IllegalStateException("ReferenceBean is not ready yet, please make sure to call reference interface method after dubbo is started.");
}
//get reference proxy
return referenceConfig.get();
}
最终通过referenceConfig.get()
来获取接口的代理,是不是跟服务端很类似。
// ReferenceConfig.java
public synchronized T get() {
if (destroyed) {
throw new IllegalStateException("The invoker of ReferenceConfig(" + url + ") has already destroyed!");
}
if (ref == null) {
init();
}
return ref;
}
在ReferenceConfig
的get方法中,会调用init进行初始化,获取代理,而其中生成代理的关键代码如下:
ref = createProxy(map);
主要逻辑如下,这里我们选择了一种 情况说明:
invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0));
return (T) PROXY_FACTORY.getProxy(invoker, ProtocolUtils.isGeneric(generic));
看到这里是不是有点熟悉了,和server端一样,也是invoker,我们以默认的dubbo协议来看看:
public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException {
optimizeSerialization(url);
// create rpc invoker.
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
invokers.add(invoker);
return invoker;
}
而这里在生成消费端的invoker的时候,传入了一个ExchangeClient
:
private ExchangeClient[] getClients(URL url) {
// whether to share connection
boolean useShareConnect = false;
int connections = url.getParameter(CONNECTIONS_KEY, 0);
List<ReferenceCountExchangeClient> shareClients = null;
// if not configured, connection is shared, otherwise, one connection for one service
if (connections == 0) {
useShareConnect = true;
/*
* The xml configuration should have a higher priority than properties.
*/
String shareConnectionsStr = url.getParameter(SHARE_CONNECTIONS_KEY, (String) null);
connections = Integer.parseInt(StringUtils.isBlank(shareConnectionsStr) ? ConfigUtils.getProperty(SHARE_CONNECTIONS_KEY,
DEFAULT_SHARE_CONNECTIONS) : shareConnectionsStr);
shareClients = getSharedClient(url, connections);
}
ExchangeClient[] clients = new ExchangeClient[connections];
for (int i = 0; i < clients.length; i++) {
if (useShareConnect) {
clients[i] = shareClients.get(i);
} else {
clients[i] = initClient(url);
}
}
return clients;
}
private ExchangeClient initClient(URL url) {
// client type setting.
String str = url.getParameter(CLIENT_KEY, url.getParameter(SERVER_KEY, DEFAULT_REMOTING_CLIENT));
url = url.addParameter(CODEC_KEY, DubboCodec.NAME);
// enable heartbeat by default
url = url.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT));
// BIO is not allowed since it has severe performance issue.
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
throw new RpcException("Unsupported client type: " + str + "," +
" supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));
}
ExchangeClient client;
try {
// connection should be lazy
if (url.getParameter(LAZY_CONNECT_KEY, false)) {
client = new LazyConnectExchangeClient(url, requestHandler);
} else {
client = Exchangers.connect(url, requestHandler);
}
} catch (RemotingException e) {
throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
}
return client;
}
这里最后调用到HeaderExchanger
中:
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
}
在Transporters
中进行服务端连接:
public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
ChannelHandler handler;
if (handlers == null || handlers.length == 0) {
handler = new ChannelHandlerAdapter();
} else if (handlers.length == 1) {
handler = handlers[0];
} else {
handler = new ChannelHandlerDispatcher(handlers);
}
return getTransporter().connect(url, handler);
}
而这里getTransporter
默认获取到的是NettyTransporter
:
public Client connect(URL url, ChannelHandler handler) throws RemotingException {
return new NettyClient(url, handler);
}
可以看到,这里返回的默认是一个NettyClient
。
到这里,可以看到,这里生成了一个DubboInvoker
,然后通过代理生成一个目标接口代理对象:
(T) PROXY_FACTORY.getProxy(invoker, ProtocolUtils.isGeneric(generic));
public <T> T getProxy(Invoker<T> invoker, boolean generic) throws RpcException {
// when compiling with native image, ensure that the order of the interfaces remains unchanged
LinkedHashSet<Class<?>> interfaces = new LinkedHashSet<>();
String config = invoker.getUrl().getParameter(INTERFACES);
if (config != null && config.length() > 0) {
String[] types = COMMA_SPLIT_PATTERN.split(config);
for (String type : types) {
// TODO can we load successfully for a different classloader?.
interfaces.add(ReflectUtils.forName(type));
}
}
if (generic) {
if (GenericService.class.equals(invoker.getInterface()) || !GenericService.class.isAssignableFrom(invoker.getInterface())) {
interfaces.add(com.alibaba.dubbo.rpc.service.GenericService.class);
}
try {
// find the real interface from url
String realInterface = invoker.getUrl().getParameter(Constants.INTERFACE);
interfaces.add(ReflectUtils.forName(realInterface));
} catch (Throwable e) {
// ignore
}
}
interfaces.add(invoker.getInterface());
interfaces.addAll(Arrays.asList(INTERNAL_INTERFACES));
return getProxy(invoker, interfaces.toArray(new Class<?>[0]));
}
最终getProxy
有两个实现,通过jdk和Javassist生成两种,与服务端完全一致,我们看下Javassist方式的:
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}
@Override
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
// TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}
这里就是标准的java的动态代理了,而Dubbo在这里实现了自己的InvocationHandler
:
public class InvokerInvocationHandler implements InvocationHandler {
private static final Logger logger = LoggerFactory.getLogger(InvokerInvocationHandler.Dubbo消费端同步调用异步调用(基于Dubbo3)
Dubbo3高级特性「系统级别检查」服务端和消费端启动时检查