dubbo——consumers

Posted FFStayF

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了dubbo——consumers相关的知识,希望对你有一定的参考价值。

一、服务<dubbo:reference />

/* org.apache.dubbo.config.bootstrap.DubboBootstrap#start */
    public DubboBootstrap start() {
        if (started.compareAndSet(false, true)) {
            //初始化
            initialize();
            if (logger.isInfoEnabled()) {
                logger.info(NAME + " is starting...");
            }
            // 暴露dubbo服务(<dubbo:service />注册到注册表)
            exportServices();

            // Not only provider register
            if (!isOnlyRegisterProvider() || hasExportedServices()) {
                // 2. export MetadataService
                exportMetadataService();
                //3. Register the local ServiceInstance if required
                registerServiceInstance();
            }

            //引用dubbo服务(<dubbo:reference /> 创建代理)
            referServices();

            if (logger.isInfoEnabled()) {
                logger.info(NAME + " has started.");
            }
        }
        return this;
    }

DubboBootStrap.referServices():本地创建远程引用代理对象

/* org.apache.dubbo.config.bootstrap.DubboBootstrap#referServices */
    private void referServices() {
        if (cache == null) {
            cache = ReferenceConfigCache.getCache();
        }

        configManager.getReferences().forEach(rc -> {
            // 与alibaba版本的ReferenceConfig兼容,Apache是ReferenceBean类型
            ReferenceConfig referenceConfig = (ReferenceConfig) rc;
            referenceConfig.setBootstrap(this);

            //<dubbo:reference init="true"/>立即加载,默认为false
            // 懒加载,延迟到依赖注入时ReferenceBean.getObject()
            if (rc.shouldInit()) {
                //异步
                if (referAsync) {
                    CompletableFuture<Object> future = ScheduledCompletableFuture.submit(
                            executorRepository.getServiceExporterExecutor(),
                            () -> cache.get(rc)
                    );
                    asyncReferringFutures.add(future);
                } else {
                    //同步
                    cache.get(rc);
                }
            }
        });
    }

<dubbo:reference init="true" />才会立即加载,默认false,懒加载:需要使用时才会加载,实现方式:

ReferenceBean是FactoryBean类型,依赖注入时,调用ReferenceBean.getObject()生成一个代理类。

Reference.getObject():获取Service的引用

/* org.apache.dubbo.config.spring.ReferenceBean#getObject */
    public Object getObject() {
        return get();
    }
/* org.apache.dubbo.config.ReferenceConfig#get */
    public synchronized T get() {
        if (destroyed) {
            throw new IllegalStateException("The invoker of ReferenceConfig(" + url + ") has already destroyed!");
        }
        if (ref == null) {
            //初始化
            init();
        }
        return ref;
    }

init():初始化

 

    public synchronized void init() {
        if (initialized) {
            return;
        }

        if (bootstrap == null) {
            bootstrap = DubboBootstrap.getInstance();
            bootstrap.init();
        }
        //检查并更新子设置
        checkAndUpdateSubConfigs();

        //检查若配置了local或stub,localClass或stubClass必须是interfaceClass的子类
        checkStubAndLocal(interfaceClass);
        //检查熔断接口
        ConfigValidationUtils.checkMock(interfaceClass, this);

        // map初始化一些属性,用途:设置到serviceMetadata(元数据)中;生成consumerUrl
        // map.put("init","false")
        // map.put("side","consumer")
        // map.put("application","dubbo-consumer")
        // map.put("register.ip","192.168.56.1")
        // map.put("release","2.7.5")
        // map.put("methods","insertUser,updateUser")
        // map.put("sticky","false")
        // map.put("dubbo","2.0.2")
        // map.put("pid","8748")
        // map.put("interface","org.study.service.UserService")
        // map.put("timestamp","1584282015078")
        Map<String, String> map = new HashMap<String, String>();
        map.put(SIDE_KEY, CONSUMER_SIDE);

        ReferenceConfigBase.appendRuntimeParameters(map);
        if (!ProtocolUtils.isGeneric(generic)) {
            String revision = Version.getVersion(interfaceClass, version);
            if (revision != null && revision.length() > 0) {
                map.put(REVISION_KEY, revision);
            }

            String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
            if (methods.length == 0) {
                logger.warn("No method found in service interface " + interfaceClass.getName());
                map.put(METHODS_KEY, ANY_VALUE);
            } else {
                map.put(METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), COMMA_SEPARATOR));
            }
        }
        map.put(INTERFACE_KEY, interfaceName);
        AbstractConfig.appendParameters(map, getMetrics());
        AbstractConfig.appendParameters(map, getApplication());
        AbstractConfig.appendParameters(map, getModule());
        // remove ‘default.‘ prefix for configs from ConsumerConfig
        // appendParameters(map, consumer, Constants.DEFAULT_KEY);
        AbstractConfig.appendParameters(map, consumer);
        AbstractConfig.appendParameters(map, this);
        MetadataReportConfig metadataReportConfig = getMetadataReportConfig();
        if (metadataReportConfig != null && metadataReportConfig.isValid()) {
            map.putIfAbsent(METADATA_KEY, REMOTE_METADATA_STORAGE_TYPE);
        }
        Map<String, AsyncMethodInfo> attributes = null;
        if (CollectionUtils.isNotEmpty(getMethods())) {
            attributes = new HashMap<>();
            for (MethodConfig methodConfig : getMethods()) {
                AbstractConfig.appendParameters(map, methodConfig, methodConfig.getName());
                String retryKey = methodConfig.getName() + ".retry";
                if (map.containsKey(retryKey)) {
                    String retryValue = map.remove(retryKey);
                    if ("false".equals(retryValue)) {
                        map.put(methodConfig.getName() + ".retries", "0");
                    }
                }
                AsyncMethodInfo asyncMethodInfo = AbstractConfig.convertMethodConfig2AsyncInfo(methodConfig);
                if (asyncMethodInfo != null) {
//                    consumerModel.getMethodModel(methodConfig.getName()).addAttribute(ASYNC_KEY, asyncMethodInfo);
                    attributes.put(methodConfig.getName(), asyncMethodInfo);
                }
            }
        }

        String hostToRegistry = ConfigUtils.getSystemProperty(DUBBO_IP_TO_REGISTRY);
        if (StringUtils.isEmpty(hostToRegistry)) {
            hostToRegistry = NetUtils.getLocalHost();
        } else if (isInvalidLocalHost(hostToRegistry)) {
            throw new IllegalArgumentException("Specified invalid registry ip from property:" + DUBBO_IP_TO_REGISTRY + ", value:" + hostToRegistry);
        }
        map.put(REGISTER_IP_KEY, hostToRegistry);

        //map数据放入到元数据中
        serviceMetadata.getAttachments().putAll(map);

        //创建引用代理,注册到注册表(建立zookeeper的znode)
        ref = createProxy(map);

        //设置到元数据中
        serviceMetadata.setTarget(ref);
        serviceMetadata.addAttribute(PROXY_CLASS_REF, ref);
        ConsumerModel consumerModel = repository.lookupReferredService(serviceMetadata.getServiceKey());
        consumerModel.setProxyObject(ref);
        consumerModel.init(attributes);
        //修改哨兵位,已初始化
        initialized = true;
        //后置持利器执行等
        // dispatch a ReferenceConfigInitializedEvent since 2.7.4
        dispatch(new ReferenceConfigInitializedEvent(this, invoker));
    }

ref = createProxy(map):

以上是关于dubbo——consumers的主要内容,如果未能解决你的问题,请参考以下文章

dubbo的provider和consumer到底怎么区分

dubbo——consumers

Provider和Consumer的搭建

dubbo consumer timeout是啥意思

dubbo配置的覆盖关系

如何判断这个方法的调用来自dubbo consumer