JDK 源码分析 -- LongAdder

Posted wenniuwuren

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了JDK 源码分析 -- LongAdder相关的知识,希望对你有一定的参考价值。

 

一、简介

当多个线程更新用于诸如收集统计信息(因为LongAdder 多线程时候会有误差)而不是用于细粒度同步控制之类的公共和时,此类通常比AtomicLong更可取。在低更新争用下,这两个类具有相似的特征。但是在竞争激烈的情况下,此类的预期吞吐量会大大提高,但要消耗更多的空间。

LongAdders可以与java.util.concurrent.ConcurrentHashMap一起使用,以维护可扩展的频率图(一种直方图或多集合形式)。例如,要将计数添加到ConcurrentHashMap <String,LongAdder>freqs,如果还不存在,则进行初始化,可以使用freqs.computeIfAbsent(k-> new LongAdder()).increment();
 

类层次结构

 

LongAdder 本身没有全局变量,其值的变更实际上是由父类 Striped64 管理的。
Striped64 通过两个全局变量来管理 value,分别是 base 和 cells ,cells 是一个数组,其元素是 Striped64 的内部类 Cell 的实现,Cell 很简单,只记录一个value。
当 LongAdder 不存在并发访问的时候,会直接通过 cas 的方式更新 base 的值,存在并发访问时,会定位到某一个 cell,修改 cell的 value。
最终,value = base + sum(cells) 其实就是累加。

 

二、源码分析

写个测试方法

LongAdder longAdder = new LongAdder();
        longAdder.add(2L);
        System.out.println(longAdder.intValue());
        longAdder.decrement();
        longAdder.reset();
        longAdder.sum();
        longAdder.increment();
        longAdder.sumThenReset();

从 add 开始debug 看源码:

public void add(long x) 
        Cell[] as; long b, v; int m; Cell a;
         //(1)初始状态下,cells==null。 (2)累加,cas保证累加结果 
         //  初始状态下,条件(1)一定是false,此时会通过casBase方法,以CAS的方式更新base值,且只有当cas失败时,才会走到if中
         // 存在竞争的时候,cas会失败。当不存在竞争的时候,LongAdder是通过累加base值实现value的更新的。
        if ((as = cells) != null || !casBase(b = base, b + x)) 
            boolean uncontended = true;
            if (as == null || (m = as.length - 1) < 0 ||
                (a = as[getProbe() & m]) == null ||
                !(uncontended = a.cas(v = a.value, v + x)))
                longAccumulate(x, null, uncontended);
        

当存在多线程竞争时,上述代码逻辑:

这些条件下都会走到 longAccumulate 方法,位于父类 Striped64 中:

final void longAccumulate(long x, LongBinaryOperator fn,
                              boolean wasUncontended) 
        int h;
        if ((h = getProbe()) == 0) 
            ThreadLocalRandom.current(); // force initialization
            h = getProbe();
            wasUncontended = true;
        
        boolean collide = false;                // True if last slot nonempty
        for (;;) 
            Cell[] as; Cell a; int n; long v;
            if ((as = cells) != null && (n = as.length) > 0) 
            // 从cells中定位一个Cell,如果是null,就new一个Cell,并将x作为初始值
                if ((a = as[(n - 1) & h]) == null) 
                    if (cellsBusy == 0)        // Try to attach new Cell
                        Cell r = new Cell(x);   // Optimistically create
                        if (cellsBusy == 0 && casCellsBusy()) 
                            boolean created = false;
                            try                // Recheck under lock
                                Cell[] rs; int m, j;
                                if ((rs = cells) != null &&
                                    (m = rs.length) > 0 &&
                                    rs[j = (m - 1) & h] == null) 
                                    rs[j] = r;
                                    created = true;
                                
                             finally 
                                cellsBusy = 0;
                            
                            if (created)
                                break;
                            continue;           // Slot is now non-empty
                        
                    
                    collide = false;
                
                else if (!wasUncontended)       // CAS already known to fail
                    wasUncontended = true;      // Continue after rehash
                // 如果定位到的Cell!=null,通过cas的方式更新这个cell维护的value x
                else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                             fn.applyAsLong(v, x))))
                    break;
                else if (n >= NCPU || cells != as)
                    collide = false;            // At max size or stale
                else if (!collide)
                    collide = true;
                else if (cellsBusy == 0 && casCellsBusy())  // cells数组扩容,迁移数据
                    try 
                        if (cells == as)       // Expand table unless stale
                            Cell[] rs = new Cell[n << 1];
                            for (int i = 0; i < n; ++i)
                                rs[i] = as[i];
                            cells = rs;
                        
                     finally 
                        cellsBusy = 0;
                    
                    collide = false;
                    continue;                   // Retry with expanded table
                
                h = advanceProbe(h);
            
            else if (cellsBusy == 0 && cells == as && casCellsBusy())  // 初始化 cells 数组,存入相应线程的value x
                boolean init = false;
                try                            // Initialize table
                    if (cells == as) 
                        Cell[] rs = new Cell[2];
                        rs[h & 1] = new Cell(x);
                        cells = rs;
                        init = true;
                    
                 finally 
                    cellsBusy = 0;
                
                if (init)
                    break;
            
            else if (casBase(v = base, ((fn == null) ? v + x :
                                        fn.applyAsLong(v, x))))
                break;                          // Fall back on using base
        
    

继续看测试用例的 intValue ,其实内部都是调 sum 方法:

返回当前总和。 返回的值不是原子快照。 在没有并发更新的情况下调用会返回准确的结果,但是在计算总和时发生的并发更新可能不会合并。 即全局变量 base、cells 在计算和的过程中其实其他线程还在更新,所以结果不准。

public int intValue() 
        return (int)sum();
    

// (1)单线程环境,直接返回 base 就是总和,(2)多线程环境需要把 cells数组中所有value 累加
public long sum() 
        Cell[] as = cells; Cell a;
        long sum = base;
        if (as != null) 
            for (int i = 0; i < as.length; ++i) 
                if ((a = as[i]) != null)
                    sum += a.value;
            
        
        return sum;
    

最后看下 reset 方法:

public void reset() 
        Cell[] as = cells; Cell a;
        base = 0L;
        if (as != null) 
            for (int i = 0; i < as.length; ++i) 
                if ((a = as[i]) != null)
                    a.value = 0L;
            
        
    

同样没有并发控制,将base和cells数组挨个设置成0。在其他线程没有更新的时候才能使用这个方法,一般不用。 一般重启应用重新计数就好了。

 

三、为什么比 AtomicLong 速度快

AtomicLong 是通过死循环 cas 更新,在并发度越高,cas 的失败率也会越高,循环次数剧增,造成CPU使用率升高。
而 LongAdder 在多线程环境会通过Cell[]的方式更新值(理想情况一个线程一个位置更新,则没有竞争)。 在没有多线程环境, LongAdder 还是使用cas更新和 AtomicLong 保持一致。

 

四、能否用 LongAdder 完全替换 AtomicLong

源码分析中对 sum 的解释可以看出来,只有数据统计(限流降级使用,监控使用)这些地方可以不用太精确的场景用 LongAdder 统计才行, 其他需要精确累计的还是需要 AtomicLong。

 

五、总结


这是典型的用空间换时间的方法,大家在技术选型的时候考虑好应用场景去针对性使用,举个例子,阿里巴巴开源的 Sentinel 中就用了 LongAdder 来计数,限流降级的统计可以不用那么精准,所以我们在使用限流降级的时候会发现比如设置限流值 100,实际情况QPS会超过100.

以上是关于JDK 源码分析 -- LongAdder的主要内容,如果未能解决你的问题,请参考以下文章

Java8原子弹类之LongAdder源码分析

javaLongAdder源码分析原理分析

jdk源码解析--LongAdder类

Java Review - 并发编程_JDK 8新增的原子操作类LongAdder & LongAccumulator

03_LongAdder 源码分析

死磕 java并发包之LongAdder源码分析