源码走读:Dubbo带权重的随机负载均衡算法与warmup

Posted Jeff.S

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了源码走读:Dubbo带权重的随机负载均衡算法与warmup相关的知识,希望对你有一定的参考价值。

在分布式架构中,当下游服务端刚启动时可能并不能承载上游瞬间大流量过来,通过warmup的机制,客户端可以根据下游服务端启动时间进行缓慢预热配比放量。而dubbo就通过注册启动时间戳的方式告知调用方自己的启动时间,客户端据此进行预热配比放量,避免对服务端造成重启!

这篇文章是承接上一篇文章:Dubbo如何实现基于http的jsonrpc调用。上篇中介绍了关于Dubbo中如何对jsonrpc进行http调用。最后我们提到了Dubbo默认的集群容错模式是failover。这里看下 边这个图中即将执行org.apache.dubbo.rpc.cluster.support.FailoverClusterInvoker#doInvoke方法。
在这里插入图片描述
进入 FailoverClusterInvoker#doInvoke方法中会执行到下边这行,用于选择目标节点的invoker对象。我们这一篇文章就重点分析下这个invoker是怎么来的?

//FailoverClusterInvoker.java
 Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);

我们只选取关键代码:

//FailoverClusterInvoker.java
 protected Invoker<T> select(LoadBalance loadbalance, Invocation invocation,
                                List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
		.....
		 Invoker<T> invoker = doSelect(loadbalance, invocation, invokers, selected);
		 ...
		 return invoker;
}

而在doSlelect方法中又调用了loadbalance#select方法:

//AbstractClusterInvoker#doSelect
 private Invoker<T> doSelect(LoadBalance loadbalance, Invocation invocation,
                                List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
      ......
	Invoker<T> invoker = loadbalance.select(invokers, getUrl(), invocation);
	....
	return invoker;
}

由dubbo-cluster中的AbstractLoadBalance.java类提供了这个select方法:

//AbstractLoadBalance#select
    @Override
    public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        if (CollectionUtils.isEmpty(invokers)) {
            return null;
        }
        if (invokers.size() == 1) {
            return invokers.get(0);
        }
        return doSelect(invokers, url, invocation);
    }
    
    protected abstract <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation);

可以看到里边有一个模板方法doSelect,即这里使用了模板模式。具体实现由子类提供,那么AbstractLoadBalance的子类有哪些呢?有下边5种:

org.apache.dubbo.rpc.cluster.loadbalance.ConsistentHashLoadBalance
org.apache.dubbo.rpc.cluster.loadbalance.LeastActiveLoadBalance
org.apache.dubbo.rpc.cluster.loadbalance.RandomLoadBalance
org.apache.dubbo.rpc.cluster.loadbalance.RoundRobinLoadBalance
org.apache.dubbo.rpc.cluster.loadbalance.ShortestResponseLoadBalance

这里我们找一个最简单的看一下比如这里的随机负载均衡算法:

//RandomLoadBalance#doSelect
    /**
     * Select one invoker between a list using a random criteria
     * @param invokers List of possible invokers
     * @param url URL
     * @param invocation Invocation
     * @param <T>
     * @return The selected invoker
     */
    @Override
    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        // Number of invokers
        int length = invokers.size();
        // Every invoker has the same weight?
        boolean sameWeight = true;
        // the maxWeight of every invokers, the minWeight = 0 or the maxWeight of the last invoker
        int[] weights = new int[length];
        // The sum of weights
        int totalWeight = 0;
        for (int i = 0; i < length; i++) {
        	//获取下游服务权重
            int weight = getWeight(invokers.get(i), invocation);
            // Sum
            totalWeight += weight;
            // save for later use
            weights[i] = totalWeight;
            if (sameWeight && totalWeight != weight * (i + 1)) {
                sameWeight = false;
            }
        }
        if (totalWeight > 0 && !sameWeight) {
            // If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight.
            int offset = ThreadLocalRandom.current().nextInt(totalWeight);
            // Return a invoker based on the random value.
            for (int i = 0; i < length; i++) {
                if (offset < weights[i]) {
                    return invokers.get(i);
                }
            }
        }
        // If all invokers have the same weight value or totalWeight=0, return evenly.
        return invokers.get(ThreadLocalRandom.current().nextInt(length));
    }

这是带权重的随机算法,使用int数组weights来存储不同下游节点的权重值,长度自然就是下游节点的个数。最后在从invokers列表选择invoker的时候,通过计算总权重的随机offset位移值来获取invoker。

 	int offset = ThreadLocalRandom.current().nextInt(totalWeight);

看注释可以知道,这是为了防止invokers列表中所有invoker的权重weight值之和小于等于0的情况,这种情况下没必要进行权重随机了,直接根据invokers列表长度随机就行了。

 		// If all invokers have the same weight value or totalWeight=0, return evenly.
        return invokers.get(ThreadLocalRandom.current().nextInt(length));

总体上,这个RandomLoadBalance#doSelect方法逻辑很简单。但其中循环获取每个invoker的权重这个方法需要引起重视。因为这个方法在下游节点配置了启动时间戳TIMESTAMP_KEY的情况下会进行warmup配比放量。也就是当下游服务端刚启动时可能并不能承载上游瞬间大流量过来,通过warmup的机制,客户端可以根据下游服务端启动时间进行缓慢预热配比放量。

	//AbstractLoadBalance.java提供
	int weight = getWeight(invokers.get(i), invocation);
	String TIMESTAMP_KEY = "timestamp";
//AbstractLoadBalance.java提供
/**
     * 获取考虑预热时间的情况下的调用权重值,如果下游服务的uptime启动时间在warmup预热时间内,那么下游服务的权重将会被减少!
     *
     * @param invoker    the invoker
     * @param invocation the invocation of this invoker
     * @return weight
     */
    int getWeight(Invoker<?> invoker, Invocation invocation) {
        int weight;
        URL url = invoker.getUrl();
        // Multiple registry scenario, load balance among multiple registries.
        if (REGISTRY_SERVICE_REFERENCE_PATH.equals(url.getServiceInterface())) {
            weight = url.getParameter(REGISTRY_KEY + "." + WEIGHT_KEY, DEFAULT_WEIGHT);
        } else {
            weight = url.getMethodParameter(invocation.getMethodName(), WEIGHT_KEY, DEFAULT_WEIGHT);
            if (weight > 0) {
                long timestamp = invoker.getUrl().getParameter(TIMESTAMP_KEY, 0L);
                if (timestamp > 0L) {
                    long uptime = System.currentTimeMillis() - timestamp;
                    if (uptime < 0) {
                        return 1;
                    }
                    //下游服务默认权重值100
                    int warmup = invoker.getUrl().getParameter(WARMUP_KEY, DEFAULT_WARMUP);
                    //下游启动时间大于0且小于预热时间阈值则减少下游服务权重
                    if (uptime > 0 && uptime < warmup) {
                        weight = calculateWarmupWeight((int)uptime, warmup, weight);
                    }
                }
            }
        }
        return Math.max(weight, 0);
    }

 /**
     * Calculate the weight according to the uptime proportion of warmup time
     * the new weight will be within 1(inclusive) to weight(inclusive)
     *
     * @param uptime the uptime in milliseconds
     * @param warmup the warmup time in milliseconds
     * @param weight the weight of an invoker
     * @return weight which takes warmup into account
     */
    static int calculateWarmupWeight(int uptime, int warmup, int weight) {
        int ww = (int) ( uptime / ((float) warmup / weight));
        return ww < 1 ? 1 : (Math.min(ww, weight));
    }

到此为止,我们分析了从FailoverClusterInvoker#doInvoke一路下来到RandomLoadBalance#doSelect方法。有什么疑问多多交流!

以上是关于源码走读:Dubbo带权重的随机负载均衡算法与warmup的主要内容,如果未能解决你的问题,请参考以下文章

源码走读:Dubbo带权重的随机负载均衡算法与warmup

源码走读:Dubbo带权重的随机负载均衡算法与warmup

源码走读:Dubbo带权重的随机负载均衡算法与warmup

Dubbo的负载均衡算法

负载均衡算法 - 基本实现

dubbo负载均衡策略及对应源码分析