Dubbo之LoadBalance源码分析

Posted Java后端笔记

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Dubbo之LoadBalance源码分析相关的知识,希望对你有一定的参考价值。

前言

LoadBalance,就是负载均衡,那么何为负载均衡,就是让服务提供者相对平摊请求,不要出现请求总落在一个提供者的情况

接口定义

 
   
   
 
  1. @SPI(RandomLoadBalance.NAME)

  2. public interface LoadBalance {

  3.    /**

  4.     * select one invoker in list.

  5.     *

  6.     * @param invokers   invokers.

  7.     * @param url        refer url

  8.     * @param invocation invocation.

  9.     * @return selected invoker.

  10.     */

  11.    @Adaptive("loadbalance")

  12.    <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException;

  13. }

select方法作用是从invokers选出下一个被调用的invoker,具体有哪些策略,如下

然后这个LoadBalance主要使用在Cluster模块中。比如failover选择下一个invoker。

下面开始源码讲解

源码

AbstractLoadBalance

上述4中策略的实现,都会继承AbstractLoadBalance这个模板类,在这个模板类中封装了getWeight方法,获取invoker的权重,特别的是,这个权重和预热时间有关,只有提供者在线时长到达了预热时间,调用这个方法获取invoker权重的时候,才能获得100%的权重。在子类中获取invoker权重都是调用这个方法

看下带有预热逻辑的权重方法

 
   
   
 
  1. //计算预热权重

  2.    protected int getWeight(Invoker<?> invoker, Invocation invocation) {

  3.        int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT);

  4.        if (weight > 0) {

  5.            long timestamp = invoker.getUrl().getParameter(Constants.REMOTE_TIMESTAMP_KEY, 0L);

  6.            if (timestamp > 0L) {

  7.                //提供者在线时长

  8.                int uptime = (int) (System.currentTimeMillis() - timestamp);

  9.                //预热时间默认10分钟

  10.                int warmup = invoker.getUrl().getParameter(Constants.WARMUP_KEY, Constants.DEFAULT_WARMUP);

  11.                if (uptime > 0 && uptime < warmup) {

  12.                    weight = calculateWarmupWeight(uptime, warmup, weight);

  13.                }

  14.            }

  15.        }

  16.        return weight;

  17.    }

  18. //用于计算预热权重

  19.    static int calculateWarmupWeight(int uptime, int warmup, int weight) {

  20.        int ww = (int) ((float) uptime / ((float) warmup / (float) weight));

  21.        return ww < 1 ? 1 : (ww > weight ? weight : ww);

  22.    }

AbstractLoadBalance实现了select方法,增加了对invoker数量的判断,如果只有一个直接返回,invokers超过1个才需要使用负载均衡选择逻辑,具体负载均衡逻辑由子类实现doSelect方法

 
   
   
 
  1. @Override

  2.    public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) {

  3.        if (invokers == null || invokers.isEmpty())

  4.            return null;

  5.        //如果只有一个提供者直接返回,预热失效

  6.        if (invokers.size() == 1)

  7.            return invokers.get(0);

  8.        return doSelect(invokers, url, invocation);

  9.    }

  10.    //让子类实现doSelect

  11.    protected abstract <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation);

为什么要预热,jvm运行时会对字节码进行优化,刚启动的字节码肯定不是最优的。或者是提供者本身有其他缓存需要初始化之类的。所以预热是有必要的。不要一启动就和其他提供者承受同样流量,可能效率会变慢。当然,如果只有一个提供者的情况下,预热就失效了。

RandomLoadBalance

随机算法,如果每个invokers权重一样,那么就是普通的随机算法,如果不同就是加权随机

 
   
   
 
  1. protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {

  2.        int length = invokers.size(); // Number of invokers

  3.        int totalWeight = 0; // The sum of weights

  4.        boolean sameWeight = true; // Every invoker has the same weight?

  5.        for (int i = 0; i < length; i++) {

  6.            int weight = getWeight(invokers.get(i), invocation);

  7.            totalWeight += weight; // Sum

  8.            if (sameWeight && i > 0

  9.                    && weight != getWeight(invokers.get(i - 1), invocation)) {

  10.                sameWeight = false;

  11.            }

  12.        }

  13.        //如果提供者权重不一样,加权随机

  14.        if (totalWeight > 0 && !sameWeight) {

  15.            // If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight.

  16.            int offset = random.nextInt(totalWeight);

  17.            // Return a invoker based on the random value.

  18.            for (int i = 0; i < length; i++) {

  19.                offset -= getWeight(invokers.get(i), invocation);

  20.                if (offset < 0) {

  21.                    return invokers.get(i);

  22.                }

  23.            }

  24.        }

  25.        //如果提供者权重都一样,普通随机

  26.        // If all invokers have the same weight value or totalWeight=0, return evenly.

  27.        return invokers.get(random.nextInt(length));

  28.    }

RoundRobinLoadBalance

轮训算法。如果每个invoker权重一样,就是普通的轮训算法。如果不同,是加权的轮训算法。

 
   
   
 
  1. protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {

  2.        String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();

  3.        int length = invokers.size(); // Number of invokers

  4.        int maxWeight = 0; // The maximum weight

  5.        int minWeight = Integer.MAX_VALUE; // The minimum weight

  6.        final LinkedHashMap<Invoker<T>, IntegerWrapper> invokerToWeightMap = new LinkedHashMap<Invoker<T>, IntegerWrapper>();

  7.        int weightSum = 0;

  8.        for (int i = 0; i < length; i++) {

  9.            int weight = getWeight(invokers.get(i), invocation);

  10.            maxWeight = Math.max(maxWeight, weight); // Choose the maximum weight

  11.            minWeight = Math.min(minWeight, weight); // Choose the minimum weight

  12.            if (weight > 0) {

  13.                invokerToWeightMap.put(invokers.get(i), new IntegerWrapper(weight));

  14.                weightSum += weight;

  15.            }

  16.        }

  17.        AtomicPositiveInteger sequence = sequences.get(key);

  18.        if (sequence == null) {

  19.            sequences.putIfAbsent(key, new AtomicPositiveInteger());

  20.            sequence = sequences.get(key);

  21.        }

  22.        int currentSequence = sequence.getAndIncrement();

  23.        //如果每个提供者权重不一样,采用加权轮训

  24.        if (maxWeight > 0 && minWeight < maxWeight) {

  25.            int mod = currentSequence % weightSum;

  26.            for (int i = 0; i < maxWeight; i++) {

  27.                for (Map.Entry<Invoker<T>, IntegerWrapper> each : invokerToWeightMap.entrySet()) {

  28.                    final Invoker<T> k = each.getKey();

  29.                    final IntegerWrapper v = each.getValue();

  30.                    if (mod == 0 && v.getValue() > 0) {

  31.                        return k;

  32.                    }

  33.                    if (v.getValue() > 0) {

  34.                        v.decrement();

  35.                        mod--;

  36.                    }

  37.                }

  38.            }

  39.        }

  40.        //每个服务提供者权重一样,就是普通轮训

  41.        // Round robin

  42.        return invokers.get(currentSequence % length);

  43.    }

LeastActiveLoadBalance

最少活跃调用数。如果最小活跃调用数的invokers大于1,如果这些invokers权重相同,采用随机算法选出invoker。如不同,采用加权随机算法。

 
   
   
 
  1. @Override

  2.    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {

  3.        int length = invokers.size(); // Number of invokers

  4.        int leastActive = -1; // The least active value of all invokers

  5.        int leastCount = 0; // The number of invokers having the same least active value (leastActive)

  6.        int[] leastIndexs = new int[length]; // The index of invokers having the same least active value (leastActive)

  7.        int totalWeight = 0; // The sum of weights

  8.        int firstWeight = 0; // Initial value, used for comparision

  9.        boolean sameWeight = true; // Every invoker has the same weight value?

  10.        //获取leasractive的数组

  11.        for (int i = 0; i < length; i++) {

  12.            Invoker<T> invoker = invokers.get(i);

  13.            int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive(); // Active number

  14.            int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT); // Weight

  15.            if (leastActive == -1 || active < leastActive) { // Restart, when find a invoker having smaller least active value.

  16.                leastActive = active; // Record the current least active value

  17.                leastCount = 1; // Reset leastCount, count again based on current leastCount

  18.                leastIndexs[0] = i; // Reset

  19.                totalWeight = weight; // Reset

  20.                firstWeight = weight; // Record the weight the first invoker

  21.                sameWeight = true; // Reset, every invoker has the same weight value?

  22.            } else if (active == leastActive) { // If current invoker's active value equals with leaseActive, then accumulating.

  23.                leastIndexs[leastCount++] = i; // Record index number of this invoker

  24.                totalWeight += weight; // Add this invoker's weight to totalWeight.

  25.                // If every invoker has the same weight?

  26.                if (sameWeight && i > 0

  27.                        && weight != firstWeight) {

  28.                    sameWeight = false;

  29.                }

  30.            }

  31.        }

  32.        // assert(leastCount > 0)

  33.        if (leastCount == 1) {

  34.            // If we got exactly one invoker having the least active value, return this invoker directly.

  35.            return invokers.get(leastIndexs[0]);

  36.        }

  37.        //在leastactive数组里面加权随机选择一个

  38.        if (!sameWeight && totalWeight > 0) {

  39.            // If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight.

  40.            int offsetWeight = random.nextInt(totalWeight);

  41.            // Return a invoker based on the random value.

  42.            for (int i = 0; i < leastCount; i++) {

  43.                int leastIndex = leastIndexs[i];

  44.                offsetWeight -= getWeight(invokers.get(leastIndex), invocation);

  45.                if (offsetWeight <= 0)

  46.                    return invokers.get(leastIndex);

  47.            }

  48.        }

  49.        //在leastative数组内随机选择一个

  50.        // If all invokers have the same weight value or totalWeight=0, return evenly.

  51.        return invokers.get(leastIndexs[random.nextInt(leastCount)]);

  52.    }

活跃调用次数会通过ActiveLimitFilter记录在RpcStatus中

ConsistentHashLoadBalance

一致性hash算法。通过调用的参数进行一致性hash,和权重无关。

 
   
   
 
  1. protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {

  2.        String methodName = RpcUtils.getMethodName(invocation);

  3.        String key = invokers.get(0).getUrl().getServiceKey() + "." + methodName;

  4.        int identityHashCode = System.identityHashCode(invokers);

  5.        ConsistentHashSelector<T> selector = (ConsistentHashSelector<T>) selectors.get(key);

  6.        if (selector == null || selector.identityHashCode != identityHashCode) {

  7.            //生成新的虚拟节点,只有新增或删除的那一段会出现问题

  8.            selectors.put(key, new ConsistentHashSelector<T>(invokers, methodName, identityHashCode));

  9.            selector = (ConsistentHashSelector<T>) selectors.get(key);

  10.        }

  11.        return selector.select(invocation);

  12.    }

一致性hash的主要逻辑都在ConsistentHashSelector中,在它的构造函数中会生成虚拟节点。默认每个invoker 160个。hash环的总节点数为2的32次方-1个

 
   
   
 
  1. ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, int identityHashCode) {

  2.            this.virtualInvokers = new TreeMap<Long, Invoker<T>>();

  3.            this.identityHashCode = identityHashCode;

  4.            URL url = invokers.get(0).getUrl();

  5.            //每个invoker生成的虚拟节点数

  6.            this.replicaNumber = url.getMethodParameter(methodName, "hash.nodes", 160);

  7.            String[] index = Constants.COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, "hash.arguments", "0"));

  8.            argumentIndex = new int[index.length];

  9.            for (int i = 0; i < index.length; i++) {

  10.                argumentIndex[i] = Integer.parseInt(index[i]);

  11.            }

  12.            for (Invoker<T> invoker : invokers) {

  13.                //生成虚拟节点

  14.                String address = invoker.getUrl().getAddress();

  15.                for (int i = 0; i < replicaNumber / 4; i++) {

  16.                    byte[] digest = md5(address + i);

  17.                    for (int h = 0; h < 4; h++) {

  18.                        long m = hash(digest, h);

  19.                        virtualInvokers.put(m, invoker);

  20.                    }

  21.                }

  22.            }

  23.        }

然后通过对方法参数的hash去取得对应的invoker

 
   
   
 
  1. public Invoker<T> select(Invocation invocation) {

  2.            String key = toKey(invocation.getArguments());

  3.            byte[] digest = md5(key);

  4.            return selectForKey(hash(digest, 0));

  5.        }

  6.        private String toKey(Object[] args) {

  7.            StringBuilder buf = new StringBuilder();

  8.            for (int i : argumentIndex) {

  9.                if (i >= 0 && i < args.length) {

  10.                    buf.append(args[i]);

  11.                }

  12.            }

  13.            return buf.toString();

  14.        }

  15.        private Invoker<T> selectForKey(long hash) {

  16.            //取大于hash的下一个节点

  17.            Map.Entry<Long, Invoker<T>> entry = virtualInvokers.tailMap(hash, true).firstEntry();

  18.            if (entry == null) {

  19.                //hash大于最后一个节点,取第一个节点

  20.                entry = virtualInvokers.firstEntry();

  21.            }

  22.            return entry.getValue();

  23.        }

tailMap方法用于取得virtualInvokers中key的hash大于参数hash的子Map,由于virtualInvokers是TreeMap,并且key为long类型,所以子Map的第一个Entry就对应hash环中的相匹配的invoker。

关于一致性hash可以看下面这篇文章(https://www.cnblogs.com/lpfuture/p/5796398.html)

node对应我们的invoker的hash 键对应我们参数的hash 通过一致性hash,能够保证大部分情况下,参数一致的请求落到同一个提供者。如果提供者发生上下线,只会影响一小部分的请求。

总结

LoadBalance中好多算法,加权随机,加权轮训以及一致性hash真是有意思。大家好好体会这个源码,看懂了,真是很有意思。

最后


以上是关于Dubbo之LoadBalance源码分析的主要内容,如果未能解决你的问题,请参考以下文章

Dubbo 源码分析 - 集群容错之 Directory

Dubbo 源码分析 - 自适应拓展原理

Dubbo分析之Cluster层

Dubbo之服务导出源码分析

Dubbo源码分析之XML的Bean解析

源码分析 Sentinel 之 Dubbo 适配原理