结合源码分析 bubble 使用注意事项

Posted 一只阿木木

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了结合源码分析 bubble 使用注意事项相关的知识,希望对你有一定的参考价值。

使用dubbo时候要尽量了解源码,不然会很容易入坑。

 

一、服务消费端ReferenceConfig需要自行缓存

ReferenceConfig实例是个很重的实例,每个ReferenceConfig实例里面都维护了与服务注册中心的一个长链,并且维护了与所有服务提供者的的长链。假设有一个服务注册中心和N个服务提供者,那么每个ReferenceConfig实例里面维护了N+1个长链,如果频繁的生成ReferenceConfig实例,可能会造成性能问题,甚至产生内存或者连接泄露的风险。特别是使用dubbo api编程时候容易忽略这个问题。

为了解决这个问题,之前都是自行缓存,但是自从dubbo2.4.0版本后,dubbo 提供了简单的工具类 ReferenceConfigCache 用于缓存ReferenceConfig 实例。使用如下:

/创建服务消费实例
ReferenceConfig<XxxService> reference = new ReferenceConfig<XxxService>();
reference.setInterface(XxxService.class);
reference.setVersion("1.0.0");
......
//获取dubbo提供的缓存
ReferenceConfigCache cache = ReferenceConfigCache.getCache();
// cache.get方法中会缓存 reference对象,并且调用reference.get方法启动ReferenceConfig,并返回经过代理后的服务接口的对象
XxxService xxxService = cache.get(reference);

// 使用xxxService对象
xxxService.sayHello();

 

需要注意的是 Cache内持有ReferenceConfig对象的引用,不要在外部再调用ReferenceConfig的destroy方法了,这会导致Cache内的ReferenceConfig失效!

如果要销毁 Cache 中的 ReferenceConfig ,将销毁 ReferenceConfig 并释放对应的资源,具体使用下面方法来销毁

 

ReferenceConfigCache cache = ReferenceConfigCache.getCache();
cache.destroy(reference);

 

另外以服务 Group、接口、版本为缓存的 Key,ReferenceConfig实例为对应的value。如果你需要使用自定义的key,可以在创建cache时候调用ReferenceConfigCache cache = ReferenceConfigCache.getCache(keyGenerator );方法传递自定义的keyGenerator。

 

二、 并发控制

2.1 服务消费方并发控制 在服务消费方法进行并发控制需要设置actives参数,如下:

<dubbo:reference id="userService" interface="com.test.UserServiceBo"
        group="dubbo" version="1.0.0" timeout="3000" actives="10"/>

 

设置com.test.UserServiceBo接口中所有方法,每个方法最多同时并发请求10个请求。

也可以使用下面方法设置接口中的单个方法的并发请求个数,如下:

 

<dubbo:reference id="userService" interface="com.test.UserServiceBo"
        group="dubbo" version="1.0.0" timeout="3000">
                <dubbo:method name="sayHello" actives="10" />
</dubbo:reference>

 

如上设置sayHello方法的并发请求数量最大为10,如果客户端请求该方法并发超过了10则客户端会被阻塞,等客户端并发请求数量少于10的时候,该请求才会被发送到服务提供方服务器。在dubbo中客户端并发控制是使用ActiveLimitFilter过滤器来控制的,代码如下:

 

public class ActiveLimitFilter implements Filter {

    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        URL url = invoker.getUrl();
        String methodName = invocation.getMethodName();
        //获取设置的acvites的值,默认为0
        int max = invoker.getUrl().getMethodParameter(methodName, Constants.ACTIVES_KEY, 0);
        //获取当前方法目前并发请求数量
        RpcStatus count = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName());
        if (max > 0) {//说明设置了actives变量
            long timeout = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.TIMEOUT_KEY, 0);
            long start = System.currentTimeMillis();
            long remain = timeout;
            int active = count.getActive();
            //如果该方法并发请求数量大于设置值,则挂起当前线程。
            if (active >= max) {
                synchronized (count) {
                    while ((active = count.getActive()) >= max) {
                        try {
                            count.wait(remain);
                        } catch (InterruptedException e) {
                        }
                        //如果等待时间超时,则抛出异常
                        long elapsed = System.currentTimeMillis() - start;
                        remain = timeout - elapsed;
                        if (remain <= 0) {
                            throw new RpcException("Waiting concurrent invoke timeout in client-side for service:  "
                                    + invoker.getInterface().getName() + ", method: "
                                    + invocation.getMethodName() + ", elapsed: " + elapsed
                                    + ", timeout: " + timeout + ". concurrent invokes: " + active
                                    + ". max concurrent invoke limit: " + max);
                        }
                    }
                }
            }
        }
        //没有限流时候,正常调用
        try {
            long begin = System.currentTimeMillis();
            RpcStatus.beginCount(url, methodName);
            try {
                Result result = invoker.invoke(invocation);
                RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, true);
                return result;
            } catch (RuntimeException t) {
                RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, false);
                throw t;
            }
        } finally {
            if (max > 0) {
                synchronized (count) {
                    count.notify();
                }
            }
        }
    }

}

 

可知客户端并发控制,是如果当并发量达到指定值后,当前客户端请求线程会被挂起,如果在等待超时期间并发请求量少了,那么阻塞的线程会被激活,然后发送请求到服务提供方,如果等待超时了,则直接抛出异常,这时候服务根本都没有发送到服务提供方服务器。

三、 改进的广播策略

前面我们讲解集群容错时候谈到有一个广播策略,该策略主要用于对所有服务提供者进行广播消息,那么有个问题需要思考,广播是是说你在客户端调用接口一次,内部就是轮询调用所有服务提供者的机器的服务,那么你调用一次该接口,返回值是什么那?比如内部轮询了10台机器,每个机器应该都有一个返回值,那么你调用的这一次返回值是10个返回值的组成?其实不是,返回的是轮询调用的最后一个机器结果,那么如果我们想把所有的机器返回的结果聚合起来如何做的?

 

 

以上是关于结合源码分析 bubble 使用注意事项的主要内容,如果未能解决你的问题,请参考以下文章

Android 逆向整体加固脱壳 ( DEX 优化流程分析 | DexPrepare.cpp 中 dvmOptimizeDexFile() 方法分析 | /bin/dexopt 源码分析 )(代码片段

LCX端口转发源码分析

pagehelper分页中pageSize等于total的问题结合源码分析

Android 事件分发事件分发源码分析 ( Activity 中各层级的事件传递 | Activity -> PhoneWindow -> DecorView -> ViewGroup )(代码片段

通过官方API结合源码,如何分析程序流程

erlang下lists模块sort(排序)方法源码解析