Java多线程安全原子性之AtomicCASSynchronized和Lock

Posted 爱上口袋的天空

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java多线程安全原子性之AtomicCASSynchronized和Lock相关的知识,希望对你有一定的参考价值。

一、线程安全

线程安全概述

当多个线程访问某个类时,不管运行时环境采用何种调度方式或者这些进程将如何交替执行,并且在主调代码中不需要任何额外的同步或协同,这个类都能表现出正确的行为,那么就称这个类是线程安全的。

线程安全性概述

线程安全性主要体现在三个方面:原子性、可见性、有序性。

  • 原子性:
    提供了互斥访问,同一时刻只能有一个线程来对它进行操作。
  • 可见性:
    一个线程对主内存的修改可以及时的被其他线程观察到。
  • 有序性:
    一个线程观察其他线程中的指令执行顺序,由于指令重排序的存在,该观察结果一般杂乱无序。
    基础代码:以下代码用于描述下方的知识点,所有代码均在此代码基础上进行修改。
public class CountExample {

    //请求总数
    public static int clientTotal  = 5000;
    //同时并发执行的线程数
    public static int threadTotal = 200;
    //变量声明:计数
    public static AtomicInteger count = new AtomicInteger(0);

    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newCachedThreadPool();//创建线程池
        final Semaphore semaphore = new Semaphore(threadTotal);//定义信号量,给出允许并发的数目
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);//定义计数器闭锁
        for (int i = 0;i<clientTotal;i++){
            executorService.execute(()->{
                try {
                    semaphore.acquire();//判断进程是否允许被执行
                    add();
                    semaphore.release();//释放进程
                } catch (InterruptedException e) {
                    log.error("excption",e);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();//保证信号量减为0
        executorService.shutdown();//关闭线程池
        log.info("count:{}",count.get());//变量取值
    }

    private static void add(){
        count.incrementAndGet();//变量操作
    }
}

原子性

说到原子性,一共有两个方面需要学习一下,一个是JDK中已经提供好的Atomic包,他们均使用了CAS完成线程的原子性操作,另一个是使用锁的机制来处理线程之间的原子性。锁包括:synchronized、Lock。

CAS的含义:

CAS是compare and swap的缩写,即我们所说的比较交换。cas是一种基于锁的操作,而且是乐观锁。在java中锁分为乐观锁和悲观锁。悲观锁是将资源锁住,等一个之前获得锁的线程释放锁之后,下一个线程才可以访问。而乐观锁采取了一种宽泛的态度,通过某种方式不加锁来处理资源,比如通过给记录加version来获取数据,性能较悲观锁有很大的提高。

CAS 操作包含三个操作数 —— 内存位置(V)、预期原值(A)和新值(B)。如果内存地址里面的值和A的值是一样的,那么就将内存里面的值更新成B。CAS是通过无限循环来获取数据的,若果在第一轮循环中,a线程获取地址里面的值被b线程修改了,那么a线程需要自旋,到下次循环才有可能机会执行。

Atomic包中的类与CAS

我们从最简单的AtomicInteger类来了解什么是CAS

 AtomicInteger

上边的示例代码就是通过AtomicInteger类保证了线程的原子性。
那么它是如何保证原子性的呢?我们接下来分析一下它的源码。示例中,对count变量的+1操作,采用的是incrementAndGet方法,此方法的源码中调用了一个名为unsafe.getAndAddInt的方法,该方法能够实现线程安全的i++操作, 不需要加锁, 而是用的CAS的思想

public final int incrementAndGet() {
     //this是当前对象, valueOffset是偏移量
     return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}

而getAndAddInt方法的具体实现为:

//获取内存地址为var1+var2的变量值, 并将该变量值加上var4
public final int getAndAddInt(Object var1, long var2, int var4) {
    int var5;
    do {
        //通过对象和偏移量获取变量的值
    	//由于volatile的修饰, 所有线程看到的var5都是一样的
        var5 = this.getIntVolatile(var1, var2);
     /*
	while中的compareAndSwapInt()方法尝试修改var5的值,具体地, 该方法也会通过var1和var2获取变量的值
	如果这个值和var5不一样, 说明其他线程修改了var1+var2地址处的值, 此时compareAndSwapInt()返回false, 继续循环
	如果这个值和var5一样, 说明没有其他线程修改var1+var2地址处的值, 此时可以将var1+var2地址处的值改为var5+var4, compareAndSwapInt()返回true, 退出循环
	Unsafe类中的compareAndSwapInt()方法是原子操作, 所以compareAndSwapInt()修改var5+var4地址处的值的时候不会被其他线程中断
	*/
    } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
    return var5;
}

在此方法中,方法参数为要操作的对象Object var1、期望底层当前的数值为var2、要修改的数值var4。定义的var5为真正从底层取出来的值。采用do…while循环的方式去获取底层数值并与期望值进行比较,比较成功才将值进行修改。而这个比较再进行修改的方法就是compareAndSwapInt就是我们所说的CAS,它是一系列的接口,比如下面罗列的几个接口。使用native修饰,是底层的方法。CAS取的是compareAndSwap三个单词的首字母.

另外,示例代码中的count可以理解为JMM中的工作内存,而这里的底层数值即为主内存,

public final native boolean compareAndSwapObject(Object var1, long var2, Object var4, Object var5);
public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var6);

AtomicLong 与 LongAdder

LongAdder是java8为我们提供的新的类,跟AtomicLong有相同的效果。首先看一下代码实现: 

AtomicLong:
//变量声明
public static AtomicLong count = new AtomicLong(0);
//变量操作
count.incrementAndGet();
//变量取值
count.get();
LongAdder:
//变量声明
public static LongAdder count = new LongAdder();
//变量操作
count.increment();
//变量取值
count

那么问题来了,为什么有了AtomicLong还要新增一个LongAdder呢?
原因是:CAS底层实现是在一个死循环中不断地尝试修改目标值,直到修改成功。如果竞争不激烈的时候,修改成功率很高,否则失败率很高。在失败的时候,这些重复的原子性操作会耗费性能。

知识点: 对于普通类型的long、double变量,JVM允许将64位的读操作或写操作拆成两个32位的操作。

LongAdder类的实现核心是将热点数据分离,比如说它可以将AtomicLong内部的内部核心数据value分离成一个数组,每个线程访问时,通过hash等算法映射到其中一个数字进行计数,而最终的计数结果则为这个数组的求和累加,其中热点数据value会被分离成多个单元的cell,每个cell独自维护内部的值。当前对象的实际值由所有的cell累计合成,这样热点就进行了有效地分离,并提高了并行度。这相当于将AtomicLong的单点的更新压力分担到各个节点上。在低并发的时候通过对base的直接更新,可以保障和AtomicLong的性能基本一致。而在高并发的时候通过分散提高了性能。

LongAdder源码:
public void increment() {
    add(1L);
}
public void add(long x) {
    Cell[] as; long b, v; int m; Cell a;
    if ((as = cells) != null || !casBase(b = base, b + x)) {
        boolean uncontended = true;
        if (as == null || (m = as.length - 1) < 0 ||
            (a = as[getProbe() & m]) == null ||
            !(uncontended = a.cas(v = a.value, v + x)))
            longAccumulate(x, null, uncontended);
    }
}

缺点:如果在统计的时候,如果有并发更新,可能会有统计数据有误差。实际使用中在处理高并发计数的时候优先使用LongAdder,而不是AtomicLong在线程竞争很低的时候,使用AtomicLong会简单效率更高一些。比如序列号生成(准确性)

AtomicBoolean

这个类中值得一提的是它包含了一个名为compareAndSet的方法,这个方法可以做到的是控制一个boolean变量在一件事情执行之前为false,事情执行之后变为true。或者也可以理解为可以控制某一件事只让一个线程执行,并仅能执行一次。
他的源码如下:

public final boolean compareAndSet(boolean expect, boolean update) {
    int e = expect ? 1 : 0;
    int u = update ? 1 : 0;
    return unsafe.compareAndSwapInt(this, valueOffset, e, u);
}

举例说明:

    //是否发生过
    private static AtomicBoolean isHappened = new AtomicBoolean(false);
    // 请求总数
    public static int clientTotal = 5000;
    // 同时并发执行的线程数
    public static int threadTotal = 200;

    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(threadTotal);
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for (int i = 0; i < clientTotal ; i++) {
            executorService.execute(() -> {
                try {
                    semaphore.acquire();
                    test();
                    semaphore.release();
                } catch (Exception e) {
                    log.error("exception", e);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("isHappened:{}", isHappened.get());
    }

    private static void test() {
        if (isHappened.compareAndSet(false, true)) {//控制某有一段代码只执行一次
            log.info("execute");
        }
    }

结果:(log只打印一次)
[pool-1-thread-2] INFO com.superboys.concurrency.example.Atomic.AtomicExample6 - execute
[main] INFO com.superboys.concurrency.example.Atomic.AtomicExample6 - isHappened:true

AtomicIntegerFieldUpdater

这个类的核心作用是要更新一个指定的类的某一个字段的值。并且这个字段一定要用volatile修饰同时还不能是static的。
举例说明:

@Slf4j
public class AtomicExample5 {

    //原子性更新某一个类的一个实例
    private static AtomicIntegerFieldUpdater<AtomicExample5> updater
            = AtomicIntegerFieldUpdater.newUpdater(AtomicExample5.class,"count");

    @Getter
    public volatile int count = 100;//必须要volatile标记,且不能是static

    public static void main(String[] args) {
        AtomicExample5 example5 = new AtomicExample5();

        if(updater.compareAndSet(example5,100,120)){
            log.info("update success 1,{}",example5.getCount());
        }

        if(updater.compareAndSet(example5,100,120)){
            log.info("update success 2,{}",example5.getCount());
        }else{
            log.info("update failed,{}",example5.getCount());
        }
    }
}

此方法输出的结果为:
[main] INFO com.superboys.concurrency.example.Atomic.AtomicExample5 - update success 1,120
[main] INFO com.superboys.concurrency.example.Atomic.AtomicExample5 - update failed,120

由此可见,count的值只修改了一次。

AtomicStampReference与CAS的ABA问题

什么是ABA问题?

CAS操作的时候,其他线程将变量的值A改成了B,但是随后又改成了A,本线程在CAS方法中使用期望值A与当前变量进行比较的时候,发现变量的值未发生改变,于是CAS就将变量的值进行了交换操作。但是实际上变量的值已经被其他的变量改变过,这与设计思想是不符合的。所以就有了AtomicStampReference。

源码:
private static class Pair<T> {
        final T reference;
        final int stamp;
        private Pair(T reference, int stamp) {
            this.reference = reference;
            this.stamp = stamp;
        }
        static <T> Pair<T> of(T reference, int stamp) {
            return new Pair<T>(reference, stamp);
        }
    }

private volatile Pair<V> pair;

private boolean casPair(Pair<V> cmp, Pair<V> val) {
        return UNSAFE.compareAndSwapObject(this, pairOffset, cmp, val);
    }

public boolean compareAndSet(V   expectedReference,
                                 V   newReference,
                                 int expectedStamp,
                                 int newStamp) {
        Pair<V> current = pair;
        return
            expectedReference == current.reference &&
            expectedStamp == current.stamp &&
            ((newReference == current.reference &&   //排除新的引用和新的版本号与底层的值相同的情况
              newStamp == current.stamp) ||
             casPair(current, Pair.of(newReference, newStamp)));
}

AtomicStampReference的处理思想是,每次变量更新的时候,将变量的版本号+1,之前的ABA问题中,变量经过两次操作以后,变量的版本号就会由1变成3,也就是说只要线程对变量进行过操作,变量的版本号就会发生更改。从而解决了ABA问题。

解释一下上边的源码:
类中维护了一个volatile修饰的Pair类型变量current,Pair是一个私有的静态类,current可以理解为底层数值。
compareAndSet方法的参数部分分别为期望的引用、新的引用、期望的版本号、新的版本号。
return的逻辑为判断了期望的引用和版本号是否与底层的引用和版本号相符,并且排除了新的引用和新的版本号与底层的值相同的情况(即不需要修改)的情况(return代码部分3、4行)。条件成立,执行casPair方法,调用CAS操作。

AtomicLongArray

这个类实际上维护了一个Array数组,我们在对数值进行更新的时候,会多一个索引值让我们更新。

Atomic包总结

原子性,提供了互斥访问,同一时刻只能有一个线程来对它进行操作。那么在java里,保证同一时刻只有一个线程对它进行操作的,除了Atomic包之外,还有锁的机制。JDK提供锁主要分为两种:synchronized和Lock。接下来我们了解一下synchronized。

synchronized

依赖于JVM去实现锁,因此在这个关键字作用对象的作用范围内,都是同一时刻只能有一个线程对其进行操作的。
synchronized是java中的一个关键字,是一种同步锁。它可以修饰的对象主要有四种: 

  • 修饰代码块:大括号括起来的代码,作用于调用的对象。
  • 修饰方法:整个方法,作用于调用的对象。
  • 修饰静态方法:整个静态方法,作用于所有对象。
  • 修饰类:括号括起来的部分,作用于所有对象。

synchronized 修饰一个代码块

被修饰的代码称为同步语句块,作用的范围是大括号括起来的部分。作用的对象是调用这段代码的对象。
验证:

public class SynchronizedExample {
    public void test(int j){
        synchronized (this){
            for (int i = 0; i < 10; i++) {
                log.info("test - {} - {}",j,i);
            }
        }
    }
    //使用线程池方法进行测试:
    public static void main(String[] args) {
        SynchronizedExample example1 = new SynchronizedExample();
        SynchronizedExample example2 = new SynchronizedExample();
        ExecutorService executorService = Executors.newCachedThreadPool();
        executorService.execute(()-> example1.test(1));
        executorService.execute(()-> example2.test(2));
    }
}

结果:不同对象之间的操作互不影响

synchronized 修饰一个方法

被修饰的方法称为同步方法,作用的范围是大括号括起来的部分,作用的对象是调用这段代码的对象。
验证:

public class SynchronizedExample 
    public synchronized void test(int j){
        for (int i = 0; i < 10; i++) {
            log.info("test - {} - {}",j,i);
        }
    }
    //验证方法与上面相同
    ...
}

 结果:不同对象之间的操作互不影响

 注意:
如果当前类是一个父类,子类调用父类的被synchronized修饰的方法,不会携带synchronized属性,因为synchronized不属于方法声明的一部分。

synchronized 修饰一个静态方法

作用的范围是synchronized 大括号括起来的部分,作用的对象是这个类的所有对象。
验证:

public class SynchronizedExample{
    public static synchronized void test(int j){
        for (int i = 0; i < 10; i++) {
            log.info("test - {} - {}",j,i);
        }
    }
    //验证方法与上面相同
    ...
}

结果:同一时间只有一个线程可以执行

synchronized 修饰一个类 

验证:

public class SynchronizedExample{
    public static void test(int j){
        synchronized (SynchronizedExample.class){
            for (int i = 0; i < 10; i++) {
                log.info("test - {}-{}",j,i);
            }
        }
    }
    //验证方法与上面相同
    ...
}

结果:同一时间只有一个线程可以执行。

Lock

在jdk1.5之后,并发包中新增了Lock接口(以及相关实现类)用来实现锁功能,Lock接口提供了与synchronized关键字类似的同步功能,但需要在使用时手动获取锁和释放锁。虽然Lock接口没有synchronized关键字自动获取和释放锁那么便捷,但Lock接口却具有了锁的可操作性,可中断获取以及超时获取锁等多种非常实用的同步特性,除此之外Lock接口还有两个非常强大的实现类重入锁和读写锁,下面一一讲解这些内容。

Lock接口

public interface Lock {
    void lock();
    void lockInterruptibly() throws InterruptedException;
    boolean tryLock();
    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
    void unlock();
    Condition newCondition();
}

下面来逐个讲述Lock接口中每个方法的使用。

  • lock()、tryLock()、tryLock(long time, TimeUnit unit)和lockInterruptibly()是用来获取锁的。
  • unLock()方法是用来释放锁的。
  • newCondition()这个方法用作线程协作。

首先lock()方法是平常使用得最多的一个方法,就是用来获取锁。如果锁已被其他线程获取,则进行等待。

由于在前面讲到如果采用Lock,必须主动去释放锁,并且在发生异常时,不会自动释放锁。因此一般来说,使用Lock必须在try{}catch{}块中进行,并且将释放锁的操作放在finally块中进行,以保证锁一定被被释放,防止死锁的发生。通常使用Lock来进行同步的话,是以下面这种形式去使用的:

Lock lock = ...;
lock.lock();
try{
    //处理任务
}catch(Exception ex){
     
}finally{
    lock.unlock();   //释放锁
}

tryLock()方法是有返回值的,它表示用来尝试获取锁,如果获取成功,则返回true,如果获取失败(即锁已被其他线程获取),则返回false,也就说这个方法无论如何都会立即返回。在拿不到锁时不会一直在那等待。

tryLock(long time, TimeUnit unit)方法和tryLock()方法是类似的,只不过区别在于这个方法在拿不到锁时会等待一定的时间,在时间期限之内如果还拿不到锁,就返回false。如果一开始拿到锁或者在等待期间内拿到了锁,则返回true。

所以,一般情况下通过tryLock来获取锁时是这样使用的:

Lock lock = ...;
if(lock.tryLock()) {
     try{
         //处理任务
     }catch(Exception ex){
         
     }finally{
         lock.unlock();   //释放锁
     } 
}else {
    //如果不能获取锁,则直接做其他事情
}

lockInterruptibly()方法比较特殊,当通过这个方法去获取锁时,如果线程正在等待获取锁,则这个线程能够响应中断,即中断线程的等待状态。也就使说,当两个线程同时通过lock.lockInterruptibly()想获取某个锁时,假若此时线程A获取到了锁,而线程B只有在等待,那么对线程B调用threadB.interrupt()方法能够中断线程B的等待过程。

由于lockInterruptibly()的声明中抛出了异常,所以lock.lockInterruptibly()必须放在try块中或者在调用lockInterruptibly()的方法外声明抛出InterruptedException。

因此lockInterruptibly()一般的使用形式如下:

public void method() throws InterruptedException {
    lock.lockInterruptibly();
    try {  
     //.....
    }
    finally {
        lock.unlock();
    }  
}

注意,当一个线程获取了锁之后,是不会被interrupt()方法中断的。因为本身在前面的文章中讲过单独调用interrupt()方法不能中断正在运行过程中的线程,只能中断阻塞过程中的线程。

因此当通过lockInterruptibly()方法获取某个锁时,如果不能获取到,只有进行等待的情况下,是可以响应中断的。

而用synchronized修饰的话,当一个线程处于等待某个锁的状态,是无法被中断的,只有一直等待下去。

ReentrantLock

  • ReentrantLock是Lock接口一种常见的实现,它是支持重进入的锁即表示该锁能够支持一个线程对资源的重复加锁。该锁还支持获取锁时的公平与非公平的选择。
  • 关于锁的重进入,其实synchronized关键字也支持。如前所述,synchronized关键字也是隐式的支持重进入,而对于ReentrantLock而言,对于已经获取到锁的线程,再次调用lock()方法时依然可以获取锁而不被阻塞。
  • 理解了锁的重进入,现在解释刚刚提到的公平获取锁与非公平获取锁。如果在绝对时间上,先对于锁进行获取的请求一定先被满足,那么这个锁就是公平的,反之就是非公平的。公平的获取锁也就是等待时间最久的线程优先获取到锁。

ReentrantLock的构造函数来控制是否为公平锁。
我在第一次了解到公平锁于非公平锁的时候,第一反应是公平锁的效率高,应该使用公平锁。但实际的情况是,非公平的锁的效率远远大于公平锁。

例一:lock()的正确使用方法

public class Test {
    private ArrayList<Integer> arrayList = new ArrayList<Integer>();
    private Lock lock = new ReentrantLock(); //注意这个地方,一定要是全局变量,否则相当于锁住的是两个不同的insert函数
    public static void main(String[] args)  {
        final Test test = new Test();
         
        new Thread(){
            public void run() {
                test.insert(Thread.currentThread());
            };
        }.start();
         
        new Thread(){
            public void run() {
                test.insert(Thread.currentThread());
            };
        }.start();
    }  
     
    public void insert(Thread thread) {
        lock.lock();
        try {
            System.out.println(thread.getName()+"得到了锁");
            for(int i=0;i<5;i++) {
                arrayList.add(i);
            }
        } catch (Exception e) {
            // TODO: handle exception
        }finally {
            System.out.println(thread.getName()+"释放了锁");
            lock.unlock();
        }
    }
}

例二:

public class Test {
    private ArrayList<Integer> arrayList = new ArrayList<Integer>();
    private Lock lock = new ReentrantLock();    //注意这个地方
    public static void main(String[] args)  {
        final Test test = new Test();
         
        new Thread(){
            public void run() {
                test.insert(Thread.currentThread());
            };
        }.start();
         
        new Thread(){
            public void run() {
                test.insert(Thread.currentThread());
            };
        }.start();
    }  
     
    public void insert(Thread thread) {
        if(lock.tryLock()) {
            try {
                System.out.println(thread.getName()+"得到了锁");
                for(int i=0;i<5;i++) {
                    arrayList.add(i);
                }
            } catch (Exception e) {
                // TODO: handle exception
            }finally {
                System.out.println(thread.getName()+"释放了锁");
                lock.unlock();
            }
        } else {
            System.out.println(thread.getName()+"获取锁失败");
        }
    }
}

例三,lockInterruptibly()响应中断的使用方法:

public class Test {
    private Lock lock = new ReentrantLock();   
    public static void main(String[] args)  {
        Test test = new Test();
        MyThread thread1 = new MyThread(test);
        MyThread thread2 = new MyThread(test);
        thread1.start();
        thread2.start();
         
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        thread2.interrupt();
    }  
     
    public void insert(Thread thread) throws InterruptedException{
        lock.lockInterruptibly();   //注意,如果需要正确中断等待锁的线程,必须将获取锁放在外面,然后将InterruptedException抛出
        try {  
            System.out.println(thread.getName()+"得到了锁");
            long startTime = System.currentTimeMillis();
            for(    ;     ;) {
                if(System.currentTimeMillis() - startTime >= Integer.MAX_VALUE)
                    break;
                //插入数据
            }
        }
        finally {
            System.out.println(Thread.currentThread().getName()+"执行finally");
            lock.unlock();
            System.out.println(thread.getName()+"释放了锁");
        }  
    }
}
 
class MyThread extends Thread {
    private Test test = null;
    public MyThread(Test test) {
        this.test = test;
    }
    @Override
    public void run() {
         
        try {
            test.insert(Thread.currentThread());
        } catch (InterruptedException e) {
            System.out.println(Thread.currentThread().getName()+"被中断");
        }
    }
}

运行之后,发现thread2能够被正确中断。

ReadWriteLock

ReadWriteLock也是一个接口,在它里面只定义了两个方法:

public interface ReadWriteLock {
    /**
     * Returns the lock used for reading.
     *
     * @return the lock used for reading.
     */
    Lock readLock();
 
    /**
     * Returns the lock used for writing.
     *
     * @return the lock used for writing.
     */
    Lock writeLock();
}

一个用来获取读锁,一个用来获取写锁。也就是说将文件的读写操作分开,分成2个锁来分配给线程,从而使得多个线程可以同时进行读操作。下面的ReentrantReadWriteLock实现了ReadWriteLock接口。

ReentrantReadWriteLock

定义一个共享的用作缓存数据结构,它的大部分时间提供读服务(查询,搜索等)而写操作较少,但写操作之后需要立即对后续的读操作可见。在没有读写锁之前,实现这个功能需要使用等待通知机制。无论使用那种方式,目的都是为了写操作立即可见于读操作而避免脏读。但使用读写锁却比等待通知简单明了多了。

一般情况下,读写锁性能优于排他锁。它能提供更好的并发性和吞吐量。

ReentrantReadWriteLock里面提供了很多丰富的方法,不过最主要的有两个方法:readLock()和writeLock()用来获取读锁和写锁。

假如有多个线程要同时进行读操作的话,先看一下synchronized达到的效果:

public class Test {
    private ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
     
    public static void main(String[] args)  {
        final Test test = new Test();
         
        new Thread(){
            public void run() {
                test.get(Thread.currentThread());
            };
        }.start();
         
        new Thread(){
            public void run() {
                test.get(Thread.currentThread());
            };
        }.start();
         
    }  
     
    public synchronized void get(Thread thread) {
        long start = System.currentTimeMillis();
        while(System.currentTimeMillis() - start <= 1) {
            System.out.println(thread.getName()+"正在进行读操作");
        }
        System.out.println(thread.getName()+"读操作完毕");
    }
}

这段程序的输出结果会是,直到thread1执行完读操作之后,才会打印thread2执行读操作的信息。

而改成用读写锁的话:

public class Test {
    private ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
     
    public static void main(String[] args)  {
        final Test test = new Test();
         
        new Thread(){
            public void run() {
                test.get(Thread.currentThread());
            };
        }.start();
         
        new Thread(){
            public void run() {
                test.get(Thread.currentThread());
            };
        }.start();
         
    }  
     
    public void get(Thread thread) {
        rwl.readLock().lock();
        try {
            long start = System.currentTimeMillis();
             
            while(System.currentTimeMillis() - start <= 1) {
                System.out.println(thread.getName()+"正在进行读操作");
            }
            System.out.println(thread.getName()+"读操作完毕");
        } finally {
            rwl.readLock().unlock();
        }
    }
}

说明thread1和thread2在同时进行读操作。这样就大大提升了读操作的效率。

不过要注意的是

  • 如果有一个线程已经占用了读锁,则此时其他线程如果要申请写锁,则申请写锁的线程会一直等待释放读锁。
  • 如果有一个线程已经占用了写锁,则此时其他线程如果申请写锁或者读锁,则申请的线程会一直等待释放写锁。

读写锁的示例:缓存

public class Cache{
  static Map<String,Object> map = new HashMap<String,Object>();
  static ReentrantReadWriteLock  rwl = new ReentrantReadWriteLock();
  static Lock rLock = rwl.readLock();
  static Lock wLock = rwl.writeLock();
  //获取一个key对应的value
  public static final Object get(String key){
  r.lock();
  try{
   return map.get(key);
   }finally{
    r.unlock();
    }
  }
  //设置key对应的value并返回旧的value
  public static final Object put(String key,Object value){
  w.lock();
  try{
   return map.put(key,value);
   }final{
   w.unlock();
    }
  }
  //清空缓存
  public static final void clear(){
  w.lock();
  try{
     map.clear();
   } finally{
    w.unlock();
    }
  }
}

上述缓存示例中,我们使用了一个非线程安全的HashMap作为缓存的时候然后使用读写锁来保证线程安全。Cache使用读写锁提升读操作的并发性,也保证每次写操作对读操作的及时可见性,同时简化了编程方式。

ReentrantReadWriteLock读写锁的几个特性:

  • 公平选择性
  • 重进入
  • 锁降级

读写锁的锁降级

锁降级是指写锁降级成为读锁。如果当前线程持有写锁,然后将其释放再获取读锁的过程不能称为锁降级。锁降级指的在持有写锁的时候再获取读锁,获取到读锁后释放之前写锁的过程称为锁释放。

锁降级在某些情况下是非常必要的,主要是为了保证数据的可见性。如果当前线程不获取读锁而直接释放写锁,假设此时另外一个线程获取了写锁并修改了数据。那么当前线程无法感知该线程的数据更新。

Condition

// 造成当前线程在接到信号或被中断之前一直处于等待状态。
void await()
// 造成当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。
boolean await(long time, TimeUnit unit)
// 造成当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。
long awaitNanos(long nanosTimeout)
// 造成当前线程在接到信号之前一直处于等待状态。
void awaitUninterruptibly()
// 造成当前线程在接到信号、被中断或到达指定最后期限之前一直处于等待状态。
boolean awaitUntil(Date deadline)
// 唤醒一个等待线程。
void signal()
// 唤醒所有等待线程。
void signalAll()

例:经典问题:三个线程依次打印ABC,代码示例如下:

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

class Business {
    private Lock lock = new ReentrantLock();
    private Condition conditionA = lock.newCondition();
    private Condition conditionB = lock.newCondition();
    private Condition conditionC = lock.newCondition();
    private String type = "A"; //内部状态

    /*
     * 方法的基本要求为:
     * 1、该方法必须为原子的。
     * 2、当前状态必须满足条件。若不满足,则等待;满足,则执行业务代码。
     * 3、业务执行完毕后,修改状态,并唤醒指定条件下的线程。
     */
    public void printA() {
        lock.lock(); //锁,保证了线程安全。
        try {
            while (type != "A") { //type不为A,
                try {
                    conditionA.await(); //将当前线程阻塞于conditionA对象上,将被阻塞。
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

            //type为A,则执行。
            System.out.println(Thread.currentThread().getName() + " 正在打印A");
            type = "B"; //将type设置为B。
            conditionB.signal(); //唤醒在等待conditionB对象上的一个线程。将信号传递出去。
        } finally {
            lock.unlock(); //解锁
        }
    }

    public void printB() {
        lock.lock(); //锁
        try {
            while (type != "B") { //type不为B,
                try {
                    conditionB.await(); //将当前线程阻塞于conditionB对象上,将被阻塞。
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

            //type为B,则执行。
            System.out.println(Thread.currentThread().getName() + " 正在打印B");
            type = "C"; //将type设置为C。
            conditionC.signal(); //唤醒在等待conditionC对象上的一个线程。将信号传递出去。
        } finally {
            lock.unlock(); //解锁
        }
    }

    public void printC() {
        lock.lock(); //锁
        try {
            while (type != "C") {
                try {
                    conditionC.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

            System.out.println(Thread.currentThread().getName() + " 正在打印C");
            type = "A";
            conditionA.signal();
        } finally {
            lock.unlock(); //解锁
        }
    }
}


public class Test{

    public static void main(String[] args) {
        final Business business = new Business();//业务对象。

        //线程1号,打印10次A。
        Thread ta = new Thread(new Runnable() {

            @Override
            public void run() {
                for(int i=0;i<10;i++){
                    business.printA();
                }
            }
        });

        //线程2号,打印10次B。
        Thread tb = new Thread(new Runnable() {

            @Override
            public void run() {
                for(int i=0;i<10;i++){
                    business.printB();
                }
            }
        });

        //线程3号,打印10次C。
        Thread tc = new Thread(new Runnable() {

            @Override
            public void run() {
                for(int i=0;i<10;i++){
                    business.printC();
                }
            }
        });

        //执行3条线程。
        ta.start();
        tb.start();
        tc.start();
    }

}

执行代码,控制台依次显示了A、B、C,10次。可以看到3条线程之间共享Business类中的资源type,且3条线程之间进行了有效的协调。

例二


public class LockExample6 {

    private static Logger log = LoggerFactory.getLogger(AtomicExample1.class);
    
    public static void main(String[] args) {
        ReentrantLock reentrantLock = new ReentrantLock();
        Condition condition = reentrantLock.newCondition();

        new Thread(() -> {
            try {
                reentrantLock.lock();
                log.info("wait signal"); // 1
                condition.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            log.info("get signal"); // 4
            reentrantLock.unlock();
        }).start();

        new Thread(() -> {
            reentrantLock.lock();
            log.info("get lock"); // 2
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            condition.signalAll();
            log.info("send signal ~ "); // 3
            reentrantLock.unlock();
        }).start();
    }
}

Lock和synchronized的选择 

总结来说,Lock和synchronized有以下几点不同:

  • Lock是一个接口,而synchronized是Java中的关键字,synchronized是内置的语言实现;
  • synchronized在发生异常时,会自动释放线程占有的锁,因此不会导致死锁现象发生;而Lock在发生异常时,如果没有主动通过unLock()去释放锁,则很可能造成死锁现象,因此使用Lock时需要在finally块中释放锁;
  • Lock可以让等待锁的线程响应中断,而synchronized却不行,使用synchronized时,等待的线程会一直等待下去,不能够响应中断;
  • 通过Lock可以知道有没有成功获取锁,而synchronized却无法办到。
  • Lock可以提高多个线程进行读操作的效率。

在性能上来说,如果竞争资源不激烈,两者的性能是差不多的,而当竞争资源非常激烈时(即有大量线程同时竞争),此时Lock的性能要远远优于synchronized。所以说,在具体使用时要根据适当情况选择。

原子性操作各方法间的对比

  • synchronized:不可中断锁,适合竞争不激烈,可读性好。
  • Lock:可中断锁,多样化同步,竞争激烈时能维持常态。
  • Atomic:竞争激烈时能维持常态,比Lock性能好,每次只能同步一个值。

以上是关于Java多线程安全原子性之AtomicCASSynchronized和Lock的主要内容,如果未能解决你的问题,请参考以下文章

java 导致多线程数据安全问题的原因

java核心-多线程- 并发原子类

Java自学-多线程 原子访问

Java多线程之原子操作类

java多线程笔记--Atomic原子操作类

高并发多线程安全之原子性问题CAS机制及问题解决方案