java并发包源码怎么读

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了java并发包源码怎么读相关的知识,希望对你有一定的参考价值。

1. 各种同步控制工具的使用

1.1 ReentrantLock 

ReentrantLock感觉上是synchronized的增强版,synchronized的特点是使用简单,一切交给JVM去处理,但是功能上是比较薄弱的。在JDK1.5之前,ReentrantLock的性能要好于synchronized,由于对JVM进行了优化,现在的JDK版本中,两者性能是不相上下的。如果是简单的实现,不要刻意去使用ReentrantLock。

相比于synchronized,ReentrantLock在功能上更加丰富,它具有可重入、可中断、可限时、公平锁等特点。

首先我们通过一个例子来说明ReentrantLock最初步的用法:

package test;

import java.util.concurrent.locks.ReentrantLock;public class Test implements Runnable public static ReentrantLock lock = new ReentrantLock(); public static int i = 0;

@Override public void run() for (int j = 0; j < 10000000; j++)
lock.lock(); try

i++;
finally
lock.unlock();



public static void main(String[] args) throws InterruptedException
Test test = new Test();
Thread t1 = new Thread(test);
Thread t2 = new Thread(test);
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println(i);



有两个线程都对i进行++操作,为了保证线程安全,使用了 ReentrantLock,从用法上可以看出,与 synchronized相比,ReentrantLock就稍微复杂一点。因为必须在finally中进行解锁操作,如果不在 finally解锁,有可能代码出现异常锁没被释放,而synchronized是由JVM来释放锁。

那么ReentrantLock到底有哪些优秀的特点呢?

1.1.1 可重入

单线程可以重复进入,但要重复退出

lock.lock();
lock.lock();try
i++;


finally
lock.unlock();
lock.unlock();

由于ReentrantLock是重入锁,所以可以反复得到相同的一把锁,它有一个与锁相关的获取计数器,如果拥有锁的某个线程再次得到锁,那么获取计数器就加1,然后锁需要被释放两次才能获得真正释放(重入锁)。这模仿了 synchronized 的语义;如果线程进入由线程已经拥有的监控器保护的 synchronized 块,就允许线程继续进行,当线程退出第二个(或者后续) synchronized 块的时候,不释放锁,只有线程退出它进入的监控器保护的第一个synchronized 块时,才释放锁。

public class Child extends Father implements Runnable    final static Child child = new Child();//为了保证锁唯一
public static void main(String[] args)        for (int i = 0; i < 50; i++)            new Thread(child).start();


public synchronized void doSomething()
System.out.println("1child.doSomething()");
doAnotherThing(); // 调用自己类中其他的synchronized方法

private synchronized void doAnotherThing()        super.doSomething(); // 调用父类的synchronized方法
System.out.println("3child.doAnotherThing()");

@Override
public void run()
child.doSomething();

class Father    public synchronized void doSomething()
System.out.println("2father.doSomething()");


我们可以看到一个线程进入不同的 synchronized方法,是不会释放之前得到的锁的。所以输出还是顺序输出。所以synchronized也是重入锁

输出:

1child.doSomething()
2father.doSomething()
3child.doAnotherThing()
1child.doSomething()
2father.doSomething()
3child.doAnotherThing()
1child.doSomething()
2father.doSomething()
3child.doAnotherThing()
...

1.1.2.可中断

与synchronized不同的是,ReentrantLock对中断是有响应的。中断相关知识查看[高并发Java 二] 多线程基础

普通的lock.lock()是不能响应中断的,lock.lockInterruptibly()能够响应中断。

我们模拟出一个死锁现场,然后用中断来处理死锁

package test;import java.lang.management.ManagementFactory;import java.lang.management.ThreadInfo;import java.lang.management.ThreadMXBean;import java.util.concurrent.locks.ReentrantLock;public class Test implements Runnable public static ReentrantLock lock1 = new ReentrantLock(); public static ReentrantLock lock2 = new ReentrantLock(); int lock; public Test(int lock)
this.lock = lock;
@Override
public void run()
try
if (lock == 1)

lock1.lockInterruptibly(); try

Thread.sleep(500);
catch (Exception e)
// TODO: handle exception

lock2.lockInterruptibly();
else

lock2.lockInterruptibly(); try

Thread.sleep(500);
catch (Exception e)
// TODO: handle exception

lock1.lockInterruptibly();

catch (Exception e)
// TODO: handle exception
finally
if (lock1.isHeldByCurrentThread())

lock1.unlock();
if (lock2.isHeldByCurrentThread())

lock2.unlock();

System.out.println(Thread.currentThread().getId() + ":线程退出");

public static void main(String[] args) throws InterruptedException
Test t1 = new Test(1);
Test t2 = new Test(2);
Thread thread1 = new Thread(t1);
Thread thread2 = new Thread(t2);
thread1.start();
thread2.start();
Thread.sleep(1000); //DeadlockChecker.check();
static class DeadlockChecker
private final static ThreadMXBean mbean = ManagementFactory
.getThreadMXBean(); final static Runnable deadlockChecker = new Runnable()
@Override
public void run()
// TODO Auto-generated method stub
while (true)
long[] deadlockedThreadIds = mbean.findDeadlockedThreads(); if (deadlockedThreadIds != null)

ThreadInfo[] threadInfos = mbean.getThreadInfo(deadlockedThreadIds); for (Thread t : Thread.getAllStackTraces().keySet())
for (int i = 0; i < threadInfos.length; i++)
if(t.getId() == threadInfos[i].getThreadId())

t.interrupt();



try

Thread.sleep(5000);
catch (Exception e)
// TODO: handle exception




;
public static void check()

Thread t = new Thread(deadlockChecker);
t.setDaemon(true);
t.start();




上述代码有可能会发生死锁,线程1得到lock1,线程2得到lock2,然后彼此又想获得对方的锁。

我们用jstack查看运行上述代码后的情况

的确发现了一个死锁。

DeadlockChecker.check();方法用来检测死锁,然后把死锁的线程中断。中断后,线程正常退出。

1.1.3.可限时

超时不能获得锁,就返回false,不会永久等待构成死锁

使用lock.tryLock(long timeout, TimeUnit unit)来实现可限时锁,参数为时间和单位。

举个例子来说明下可限时:

package test;import java.util.concurrent.TimeUnit;import java.util.concurrent.locks.ReentrantLock;public class Test implements Runnable public static ReentrantLock lock = new ReentrantLock(); @Override
public void run()
try
if (lock.tryLock(5, TimeUnit.SECONDS))

Thread.sleep(6000);
else

System.out.println("get lock failed");

catch (Exception e)

finally
if (lock.isHeldByCurrentThread())

lock.unlock();



public static void main(String[] args)

Test t = new Test();
Thread t1 = new Thread(t);
Thread t2 = new Thread(t);
t1.start();
t2.start();



使用两个线程来争夺一把锁,当某个线程获得锁后,sleep6秒,每个线程都只尝试5秒去获得锁。

所以必定有一个线程无法获得锁。无法获得后就直接退出了。

输出:

get lock failed

1.1.4.公平锁

使用方式:

public ReentrantLock(boolean fair) public static ReentrantLock fairLock = new ReentrantLock(true);

一般意义上的锁是不公平的,不一定先来的线程能先得到锁,后来的线程就后得到锁。不公平的锁可能会产生饥饿现象。

公平锁的意思就是,这个锁能保证线程是先来的先得到锁。虽然公平锁不会产生饥饿现象,但是公平锁的性能会比非公平锁差很多。

1.2 Condition

Condition与ReentrantLock的关系就类似于synchronized与Object.wait()/signal()

await()方法会使当前线程等待,同时释放当前锁,当其他线程中使用signal()时或者signalAll()方法时,线 程会重新获得锁并继续执行。或者当线程被中断时,也能跳出等待。这和Object.wait()方法很相似。

awaitUninterruptibly()方法与await()方法基本相同,但是它并不会再等待过程中响应中断。 singal()方法用于唤醒一个在等待中的线程。相对的singalAll()方法会唤醒所有在等待中的线程。这和Obejct.notify()方法很类似。

这里就不再详细介绍了。举个例子来说明:

package test;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;public class Test implements Runnable public static ReentrantLock lock = new ReentrantLock(); public static Condition condition = lock.newCondition();

@Override public void run() try
lock.lock();
condition.await();
System.out.println("Thread is going on");
catch (Exception e)

e.printStackTrace();
finally
lock.unlock();


public static void main(String[] args) throws InterruptedException
Test t = new Test();
Thread thread = new Thread(t);
thread.start();
Thread.sleep(2000);
lock.lock();
condition.signal(); lock.unlock();



上述例子很简单,让一个线程await住,让主线程去唤醒它。condition.await()/signal只能在得到锁以后使用。

1.3.Semaphore

对于锁来说,它是互斥的排他的。意思就是,只要我获得了锁,没人能再获得了。

而对于Semaphore来说,它允许多个线程同时进入临界区。可以认为它是一个共享锁,但是共享的额度是有限制的,额度用完了,其他没有拿到额度的线程还是要阻塞在临界区外。当额度为1时,就相等于lock

下面举个例子:

package test;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Semaphore;public class Test implements Runnable final Semaphore semaphore = new Semaphore(5); @Override
public void run()
try

semaphore.acquire();
Thread.sleep(2000);
System.out.println(Thread.currentThread().getId() + " done");
catch (Exception e)

e.printStackTrace();
finally
semaphore.release();


public static void main(String[] args) throws InterruptedException
ExecutorService executorService = Executors.newFixedThreadPool(20); final Test t = new Test(); for (int i = 0; i < 20; i++)

executorService.submit(t);




有一个20个线程的线程池,每个线程都去 Semaphore的许可,Semaphore的许可只有5个,运行后可以看到,5个一批,一批一批地输出。

当然一个线程也可以一次申请多个许可

public void acquire(int permits) throws InterruptedException

1.4 ReadWriteLock

ReadWriteLock是区分功能的锁。读和写是两种不同的功能,读-读不互斥,读-写互斥,写-写互斥。

这样的设计是并发量提高了,又保证了数据安全。

使用方式:

private static ReentrantReadWriteLock readWriteLock=new ReentrantReadWriteLock();
private static Lock readLock = readWriteLock.readLock();
private static Lock writeLock = readWriteLock.writeLock();

详细例子可以查看 Java实现生产者消费者问题与读者写者问题,这里就不展开了。

1.5 CountDownLatch

倒数计时器
一种典型的场景就是火箭发射。在火箭发射前,为了保证万无一失,往往还要进行各项设备、仪器的检查。 只有等所有检查完毕后,引擎才能点火。这种场景就非常适合使用CountDownLatch。它可以使得点火线程
,等待所有检查线程全部完工后,再执行

使用方式:

static final CountDownLatch end = new CountDownLatch(10);
end.countDown();
end.await();

示意图:

一个简单的例子:

package test;import java.util.concurrent.CountDownLatch;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;public class Test implements Runnable static final CountDownLatch countDownLatch = new CountDownLatch(10); static final Test t = new Test(); @Override
public void run()
try

Thread.sleep(2000);
System.out.println("complete");
countDownLatch.countDown();
catch (Exception e)

e.printStackTrace();


public static void main(String[] args) throws InterruptedException
ExecutorService executorService = Executors.newFixedThreadPool(10); for (int i = 0; i < 10; i++)

executorService.execute(t);

countDownLatch.await();
System.out.println("end");
executorService.shutdown();



主线程必须等待10个线程全部执行完才会输出"end"。

1.6 CyclicBarrier

和CountDownLatch相似,也是等待某些线程都做完以后再执行。与CountDownLatch区别在于这个计数器可以反复使用。比如,假设我们将计数器设置为10。那么凑齐第一批1 0个线程后,计数器就会归零,然后接着凑齐下一批10个线程

使用方式:

public CyclicBarrier(int parties, Runnable barrierAction) barrierAction就是当计数器一次计数完成后,系统会执行的动作await()

示意图:

下面举个例子:

package test;import java.util.concurrent.CyclicBarrier;public class Test implements Runnable private String soldier; private final CyclicBarrier cyclic; public Test(String soldier, CyclicBarrier cyclic)
this.soldier = soldier; this.cyclic = cyclic;
@Override
public void run()
try
//等待所有士兵到齐
cyclic.await();
dowork(); //等待所有士兵完成工作
cyclic.await();
catch (Exception e)
// TODO Auto-generated catch block
e.printStackTrace();


private void dowork()
// TODO Auto-generated method stub
try

Thread.sleep(3000);
catch (Exception e)
// TODO: handle exception

System.out.println(soldier + ": done");
public static class BarrierRun implements Runnable
boolean flag; int n; public BarrierRun(boolean flag, int n)
super(); this.flag = flag; this.n = n;
@Override
public void run()
if (flag)

System.out.println(n + "个任务完成");
else

System.out.println(n + "个集合完成");
flag = true;




public static void main(String[] args)
final int n = 10;
Thread[] threads = new Thread[n]; boolean flag = false;
CyclicBarrier barrier = new CyclicBarrier(n, new BarrierRun(flag, n));
System.out.println("集合"); for (int i = 0; i < n; i++)

System.out.println(i + "报道");
threads[i] = new Thread(new Test("士兵" + i, barrier));
threads[i].start();




打印结果:

集合

士兵5: done士兵7: done士兵8: done士兵3: done士兵4: done士兵1: done士兵6: done士兵2: done士兵0: done士兵9: done10个任务完成

1.7 LockSupport

提供线程阻塞原语

和suspend类似

LockSupport.park();
LockSupport.unpark(t1);

与suspend相比 不容易引起线程冻结

LockSupport的思想呢,和 Semaphore有点相似,内部有一个许可,park的时候拿掉这个许可,unpark的时候申请这个许可。所以如果unpark在park之前,是不会发生线程冻结的。

下面的代码是[高并发Java 二] 多线程基础中suspend示例代码,在使用suspend时会发生死锁。

而使用 LockSupport则不会发生死锁。

另外

park()能够响应中断,但不抛出异常。中断响应的结果是,park()函数的返回,可以从Thread.interrupted()得到中断标志。

在JDK当中有大量地方使用到了park,当然LockSupport的实现也是使用unsafe.park()来实现的。

public static void park()        unsafe.park(false, 0L);

1.8 ReentrantLock 的实现

下面来介绍下ReentrantLock的实现,ReentrantLock的实现主要由3部分组成:

    CAS状态

    等待队列

    park()

    ReentrantLock的父类中会有一个state变量来表示同步的状态

    /**
        * The synchronization state.
        */
       private volatile int state;

    通过CAS操作来设置state来获取锁,如果设置成了1,则将锁的持有者给当前线程

    final void lock()            if (compareAndSetState(0, 1))
                   setExclusiveOwnerThread(Thread.currentThread());            else
                   acquire(1);
           

    如果拿锁不成功,则会做一个申请

    public final void acquire(int arg)        if (!tryAcquire(arg) &&
               acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
               selfInterrupt();
       

    首先,再去申请下试试看tryAcquire,因为此时可能另一个线程已经释放了锁。

    如果还是没有申请到锁,就addWaiter,意思是把自己加到等待队列中去

    其间还会有多次尝试去申请锁,如果还是申请不到,就会被挂起

    private final boolean parkAndCheckInterrupt()
           LockSupport.park(this);        return Thread.interrupted();
       

    同理,如果在unlock操作中,就是释放了锁,然后unpark,这里就不具体讲了。

    2. 并发容器及典型源码分析

    2.1 ConcurrentHashMap 

    我们知道HashMap不是一个线程安全的容器,最简单的方式使HashMap变成线程安全就是使用Collections.synchronizedMap,它是对HashMap的一个包装

    public static Map m=Collections.synchronizedMap(new HashMap());

    同理对于List,Set也提供了相似方法。

    但是这种方式只适合于并发量比较小的情况。

    我们来看下synchronizedMap的实现

    它会将HashMap包装在里面,然后将HashMap的每个操作都加上synchronized。

    由于每个方法都是获取同一把锁(mutex),这就意味着,put和remove等操作是互斥的,大大减少了并发量。

    下面来看下ConcurrentHashMap是如何实现的

    在 ConcurrentHashMap内部有一个Segment段,它将大的HashMap切分成若干个段(小的HashMap),然后让数据在每一段上Hash,这样多个线程在不同段上的Hash操作一定是线程安全的,所以只需要同步同一个段上的线程就可以了,这样实现了锁的分离,大大增加了并发量。

    在使用ConcurrentHashMap.size时会比较麻烦,因为它要统计每个段的数据和,在这个时候,要把每一个段都加上锁,然后再做数据统计。这个就是把锁分离后的小小弊端,但是size方法应该是不会被高频率调用的方法。

    在实现上,不使用synchronized和lock.lock而是尽量使用trylock,同时在HashMap的实现上,也做了一点优化。这里就不提了。

    2.2 BlockingQueue

    BlockingQueue不是一个高性能的容器。但是它是一个非常好的共享数据的容器。是典型的生产者和消费者的实现。

参考技术A 下个jdk啊,都是带源码的

死磕 java并发包之LongAdder源码分析

问题

(1)java8中为什么要新增LongAdder?

(2)LongAdder的实现方式?

(3)LongAdder与AtomicLong的对比?

简介

LongAdder是java8中新增的原子类,在多线程环境中,它比AtomicLong性能要高出不少,特别是写多的场景。

它是怎么实现的呢?让我们一起来学习吧。

原理

LongAdder的原理是,在最初无竞争时,只更新base的值,当有多线程竞争时通过分段的思想,让不同的线程更新不同的段,最后把这些段相加就得到了完整的LongAdder存储的值。

技术图片

源码分析

LongAdder继承自Striped64抽象类,Striped64中定义了Cell内部类和各重要属性。

主要内部类

// Striped64中的内部类,使用@sun.misc.Contended注解,说明里面的值消除伪共享
@sun.misc.Contended static final class Cell {
    // 存储元素的值,使用volatile修饰保证可见性
    volatile long value;
    Cell(long x) { value = x; }
    // CAS更新value的值
    final boolean cas(long cmp, long val) {
        return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
    }

    // Unsafe实例
    private static final sun.misc.Unsafe UNSAFE;
    // value字段的偏移量
    private static final long valueOffset;
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> ak = Cell.class;
            valueOffset = UNSAFE.objectFieldOffset
                (ak.getDeclaredField("value"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
}

Cell类使用@sun.misc.Contended注解,说明是要避免伪共享的。

使用Unsafe的CAS更新value的值,其中value的值使用volatile修饰,保证可见性。

关于Unsafe的介绍请查看【死磕 java魔法类之Unsafe解析】。

关于伪共享的介绍请查看【杂谈 什么是伪共享(false sharing)?】。

主要属性

// 这三个属性都在Striped64中
// cells数组,存储各个段的值
transient volatile Cell[] cells;
// 最初无竞争时使用的,也算一个特殊的段
transient volatile long base;
// 标记当前是否有线程在创建或扩容cells,或者在创建Cell
// 通过CAS更新该值,相当于是一个锁
transient volatile int cellsBusy;

最初无竞争或有其它线程在创建cells数组时使用base更新值,有过竞争时使用cells更新值。

最初无竞争是指一开始没有线程之间的竞争,但也有可能是多线程在操作,只是这些线程没有同时去更新base的值。

有过竞争是指只要出现过竞争不管后面有没有竞争都使用cells更新值,规则是不同的线程hash到不同的cell上去更新,减少竞争。

add(x)方法

add(x)方法是LongAdder的主要方法,使用它可以使LongAdder中存储的值增加x,x可为正可为负。

public void add(long x) {
    // as是Striped64中的cells属性
    // b是Striped64中的base属性
    // v是当前线程hash到的Cell中存储的值
    // m是cells的长度减1,hash时作为掩码使用
    // a是当前线程hash到的Cell
    Cell[] as; long b, v; int m; Cell a;
    // 条件1:cells不为空,说明出现过竞争,cells已经创建
    // 条件2:cas操作base失败,说明其它线程先一步修改了base,正在出现竞争
    if ((as = cells) != null || !casBase(b = base, b + x)) {
        // true表示当前竞争还不激烈
        // false表示竞争激烈,多个线程hash到同一个Cell,可能要扩容
        boolean uncontended = true;
        // 条件1:cells为空,说明正在出现竞争,上面是从条件2过来的
        // 条件2:应该不会出现
        // 条件3:当前线程所在的Cell为空,说明当前线程还没有更新过Cell,应初始化一个Cell
        // 条件4:更新当前线程所在的Cell失败,说明现在竞争很激烈,多个线程hash到了同一个Cell,应扩容
        if (as == null || (m = as.length - 1) < 0 ||
            // getProbe()方法返回的是线程中的threadLocalRandomProbe字段
            // 它是通过随机数生成的一个值,对于一个确定的线程这个值是固定的
            // 除非刻意修改它
            (a = as[getProbe() & m]) == null ||
            !(uncontended = a.cas(v = a.value, v + x)))
            // 调用Striped64中的方法处理
            longAccumulate(x, null, uncontended);
    }
}

(1)最初无竞争时只更新base;

(2)直到更新base失败时,创建cells数组;

(3)当多个线程竞争同一个Cell比较激烈时,可能要扩容;

longAccumulate()方法

final void longAccumulate(long x, LongBinaryOperator fn,
                              boolean wasUncontended) {
    // 存储线程的probe值
    int h;
    // 如果getProbe()方法返回0,说明随机数未初始化
    if ((h = getProbe()) == 0) {
        // 强制初始化
        ThreadLocalRandom.current(); // force initialization
        // 重新获取probe值
        h = getProbe();
        // 都未初始化,肯定还不存在竞争激烈
        wasUncontended = true;
    }
    // 是否发生碰撞
    boolean collide = false;                // True if last slot nonempty
    for (;;) {
        Cell[] as; Cell a; int n; long v;
        // cells已经初始化过
        if ((as = cells) != null && (n = as.length) > 0) {
            // 当前线程所在的Cell未初始化
            if ((a = as[(n - 1) & h]) == null) {
                // 当前无其它线程在创建或扩容cells,也没有线程在创建Cell
                if (cellsBusy == 0) {       // Try to attach new Cell
                    // 新建一个Cell,值为当前需要增加的值
                    Cell r = new Cell(x);   // Optimistically create
                    // 再次检测cellsBusy,并尝试更新它为1
                    // 相当于当前线程加锁
                    if (cellsBusy == 0 && casCellsBusy()) {
                        // 是否创建成功
                        boolean created = false;
                        try {               // Recheck under lock
                            Cell[] rs; int m, j;
                            // 重新获取cells,并找到当前线程hash到cells数组中的位置
                            // 这里一定要重新获取cells,因为as并不在锁定范围内
                            // 有可能已经扩容了,这里要重新获取
                            if ((rs = cells) != null &&
                                (m = rs.length) > 0 &&
                                rs[j = (m - 1) & h] == null) {
                                // 把上面新建的Cell放在cells的j位置处
                                rs[j] = r;
                                // 创建成功
                                created = true;
                            }
                        } finally {
                            // 相当于释放锁
                            cellsBusy = 0;
                        }
                        // 创建成功了就返回
                        // 值已经放在新建的Cell里面了
                        if (created)
                            break;
                        continue;           // Slot is now non-empty
                    }
                }
                // 标记当前未出现冲突
                collide = false;
            }
            // 当前线程所在的Cell不为空,且更新失败了
            // 这里简单地设为true,相当于简单地自旋一次
            // 通过下面的语句修改线程的probe再重新尝试
            else if (!wasUncontended)       // CAS already known to fail
                wasUncontended = true;      // Continue after rehash
            // 再次尝试CAS更新当前线程所在Cell的值,如果成功了就返回
            else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                         fn.applyAsLong(v, x))))
                break;
            // 如果cells数组的长度达到了CPU核心数,或者cells扩容了
            // 设置collide为false并通过下面的语句修改线程的probe再重新尝试
            else if (n >= NCPU || cells != as)
                collide = false;            // At max size or stale
            // 上上个elseif都更新失败了,且上个条件不成立,说明出现冲突了
            else if (!collide)
                collide = true;
            // 明确出现冲突了,尝试占有锁,并扩容
            else if (cellsBusy == 0 && casCellsBusy()) {
                try {
                    // 检查是否有其它线程已经扩容过了
                    if (cells == as) {      // Expand table unless stale
                        // 新数组为原数组的两倍
                        Cell[] rs = new Cell[n << 1];
                        // 把旧数组元素拷贝到新数组中
                        for (int i = 0; i < n; ++i)
                            rs[i] = as[i];
                        // 重新赋值cells为新数组
                        cells = rs;
                    }
                } finally {
                    // 释放锁
                    cellsBusy = 0;
                }
                // 已解决冲突
                collide = false;
                // 使用扩容后的新数组重新尝试
                continue;                   // Retry with expanded table
            }
            // 更新失败或者达到了CPU核心数,重新生成probe,并重试
            h = advanceProbe(h);
        }
        // 未初始化过cells数组,尝试占有锁并初始化cells数组
        else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
            // 是否初始化成功
            boolean init = false;
            try {                           // Initialize table
                // 检测是否有其它线程初始化过
                if (cells == as) {
                    // 新建一个大小为2的Cell数组
                    Cell[] rs = new Cell[2];
                    // 找到当前线程hash到数组中的位置并创建其对应的Cell
                    rs[h & 1] = new Cell(x);
                    // 赋值给cells数组
                    cells = rs;
                    // 初始化成功
                    init = true;
                }
            } finally {
                // 释放锁
                cellsBusy = 0;
            }
            // 初始化成功直接返回
            // 因为增加的值已经同时创建到Cell中了
            if (init)
                break;
        }
        // 如果有其它线程在初始化cells数组中,就尝试更新base
        // 如果成功了就返回
        else if (casBase(v = base, ((fn == null) ? v + x :
                                    fn.applyAsLong(v, x))))
            break;                          // Fall back on using base
    }
}

(1)如果cells数组未初始化,当前线程会尝试占有cellsBusy锁并创建cells数组;

(2)如果当前线程尝试创建cells数组时,发现有其它线程已经在创建了,就尝试更新base,如果成功就返回;

(3)通过线程的probe值找到当前线程应该更新cells数组中的哪个Cell;

(4)如果当前线程所在的Cell未初始化,就占有占有cellsBusy锁并在相应的位置创建一个Cell;

(5)尝试CAS更新当前线程所在的Cell,如果成功就返回,如果失败说明出现冲突;

(5)当前线程更新Cell失败后并不是立即扩容,而是尝试更新probe值后再重试一次;

(6)如果在重试的时候还是更新失败,就扩容;

(7)扩容时当前线程占有cellsBusy锁,并把数组容量扩大到两倍,再迁移原cells数组中元素到新数组中;

(8)cellsBusy在创建cells数组、创建Cell、扩容cells数组三个地方用到;

sum()方法

sum()方法是获取LongAdder中真正存储的值的大小,通过把base和所有段相加得到。

public long sum() {
    Cell[] as = cells; Cell a;
    // sum初始等于base
    long sum = base;
    // 如果cells不为空
    if (as != null) {
        // 遍历所有的Cell
        for (int i = 0; i < as.length; ++i) {
            // 如果所在的Cell不为空,就把它的value累加到sum中
            if ((a = as[i]) != null)
                sum += a.value;
        }
    }
    // 返回sum
    return sum;
}

可以看到sum()方法是把base和所有段的值相加得到,那么,这里有一个问题,如果前面已经累加到sum上的Cell的value有修改,不是就没法计算到了么?

答案确实如此,所以LongAdder可以说不是强一致性的,它是最终一致性的。

LongAdder VS AtomicLong

直接上代码:

public class LongAdderVSAtomicLongTest {
    public static void main(String[] args){
        testAtomicLongVSLongAdder(1, 10000000);
        testAtomicLongVSLongAdder(10, 10000000);
        testAtomicLongVSLongAdder(20, 10000000);
        testAtomicLongVSLongAdder(40, 10000000);
        testAtomicLongVSLongAdder(80, 10000000);
    }

    static void testAtomicLongVSLongAdder(final int threadCount, final int times){
        try {
            System.out.println("threadCount:" + threadCount + ", times:" + times);
            long start = System.currentTimeMillis();
            testLongAdder(threadCount, times);
            System.out.println("LongAdder elapse:" + (System.currentTimeMillis() - start) + "ms");

            long start2 = System.currentTimeMillis();
            testAtomicLong(threadCount, times);
            System.out.println("AtomicLong elapse:" + (System.currentTimeMillis() - start2) + "ms");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    static void testAtomicLong(final int threadCount, final int times) throws InterruptedException {
        AtomicLong atomicLong = new AtomicLong();
        List<Thread> list = new ArrayList<>();
        for (int i=0;i<threadCount;i++){
            list.add(new Thread(() -> {
                for (int j = 0; j<times; j++){
                    atomicLong.incrementAndGet();
                }
            }));
        }

        for (Thread thread : list){
            thread.start();
        }

        for (Thread thread : list){
            thread.join();
        }
    }

    static void testLongAdder(final int threadCount, final int times) throws InterruptedException {
        LongAdder longAdder = new LongAdder();
        List<Thread> list = new ArrayList<>();
        for (int i=0;i<threadCount;i++){
            list.add(new Thread(() -> {
                for (int j = 0; j<times; j++){
                    longAdder.add(1);
                }
            }));
        }

        for (Thread thread : list){
            thread.start();
        }

        for (Thread thread : list){
            thread.join();
        }
    }
}

运行结果如下:

threadCount:1, times:10000000
LongAdder elapse:158ms
AtomicLong elapse:64ms
threadCount:10, times:10000000
LongAdder elapse:206ms
AtomicLong elapse:2449ms
threadCount:20, times:10000000
LongAdder elapse:429ms
AtomicLong elapse:5142ms
threadCount:40, times:10000000
LongAdder elapse:840ms
AtomicLong elapse:10506ms
threadCount:80, times:10000000
LongAdder elapse:1369ms
AtomicLong elapse:20482ms

可以看到当只有一个线程的时候,AtomicLong反而性能更高,随着线程越来越多,AtomicLong的性能急剧下降,而LongAdder的性能影响很小。

总结

(1)LongAdder通过base和cells数组来存储值;

(2)不同的线程会hash到不同的cell上去更新,减少了竞争;

(3)LongAdder的性能非常高,最终会达到一种无竞争的状态;

彩蛋

在longAccumulate()方法中有个条件是n >= NCPU就不会走到扩容逻辑了,而n是2的倍数,那是不是代表cells数组最大只能达到大于等于NCPU的最小2次方?

答案是明确的。因为同一个CPU核心同时只会运行一个线程,而更新失败了说明有两个不同的核心更新了同一个Cell,这时会重新设置更新失败的那个线程的probe值,这样下一次它所在的Cell很大概率会发生改变,如果运行的时间足够长,最终会出现同一个核心的所有线程都会hash到同一个Cell(大概率,但不一定全在一个Cell上)上去更新,所以,这里cells数组中长度并不需要太长,达到CPU核心数足够了。

比如,笔者的电脑是8核的,所以这里cells的数组最大只会到8,达到8就不会扩容了。

技术图片


欢迎关注我的公众号“彤哥读源码”,查看更多源码系列文章, 与彤哥一起畅游源码的海洋。

技术图片

以上是关于java并发包源码怎么读的主要内容,如果未能解决你的问题,请参考以下文章

java并发包研究之-ConcurrentHashMap

CopyOnWriteArrayList实现原理及源码分析

Java并发包源码分析

111

并发编程(十六)——java7 深入并发包 ConcurrentHashMap 源码解析

Java高并发系列之ReentrantReadWriteLock源码分析