Java——多线程高并发系列之理解CAS原子变量类的使用

Posted 张起灵-小哥

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java——多线程高并发系列之理解CAS原子变量类的使用相关的知识,希望对你有一定的参考价值。

文章目录:

1.CAS

2.原子变量类

2.1AtomicInteger

2.2 AtomicLong

2.3 AtomicIntegerArray

2.4 AtomicIntegerFieldUpdater

2.5 AtomicReference

2.6 AtomicStampedReference


1.CAS

CAS(Compare And Swap)是由硬件实现的。CAS 可以将 read- modify - write 这类的操作转换为原子操作。

i++自增操作包括三个子操作:  从主内存读取 i 变量值;对 i 的值加 1;再把加 1 之后 的值保存到主内存。

CAS 原理: 在把数据更新到主内存时,再次读取主内存变量的值,如果现在变量的值与期望的值(操作起始时读取的值)一样就更新。

下面我说一下,我对上面这张图中CAS算法的理解: 首先在主内存有一个所有线程都共享的count变量,它的值初始化为10。

  1. 线程1从主内存中读取count变量的值到自己的工作内存中,此时count为10。(这个值我们记为:线程1期望的count值、最初读取到的count值)
  2. 线程1对自己工作内存中的count的值进行自增操作,此时count为11。(这个值我们记为:线程1更新之后的count值)
  3. 这个时候,当线程1还未执行到第三步更新count的值到主内存中的时候,线程2突然抢到了CPU执行权,它进来从主内存中读取count变量的值到自己的工作内存中,因为线程1还未更新,所以此时count仍为10。(这个值我们记为:线程2期望的count值、最初读取到的count值)
  4. 线程2对自己工作内存中的count的值进行自增操作,此时count为11。(这个值我们记为:线程2更新之后的count值)
  5. 在线程2要将自己工作内存中的count值更新到主内存之前,要执行一部重要的操作!!!就是再次读取主内存中共享变量count的值,如果此时读取到的这个值与最开始线程2期望的count值相等,那么就顺利更新count的值;如果不相等,就撤销放弃本次count++操作。
  6. 那么线程2最开始期望的count值为10,而此时从主内存中读取到的count值还是10,一样,所以就更新,将自己工作内存中的count值更新到主内存中,此时count为11。
  7. 那么线程2执行完了,此时线程1重新获得CPU执行权,它已经完成了count++的前两步了,就剩最后一步更新操作了,那么它此时同线程2一样,再次从主内存中读取共享变量count的值,一样就更新、不一样就撤销放弃操作。然而count的值已经被线程2更新过了,此时为11,而线程1最开始期望的count的值为10,两次读取到count的值不一样了!!!所以线程1就会自动撤销本次count++操作。
  8. 最终,本次线程1、2对count变量执行++操作的结果:线程1失败、线程2成功,最后count的值只增加了1次,为11。

下面,我再来说一下什么是CAS中的ABA问题。

  • CAS实现原子操作背后有一个假设:共享变量的当前值与当前线程提供的期望值相同,就认为这个变量没有被其他线程修改过。
  • 实际上这种假设不一定总是成立的。假如说有共享变量 count=100。
  • A线程将 count 修改为 200
  • B线程将 count 修改为 300
  • C线程将 count 修改为 100
  • 当前线程看到的count变量的值是100,这就与最初的count变量的值是一样的,那么是否就认为count变量的值没有被其他线程更新呢?显然不是啊,它明显的被A、B两个线程更新过。
  • 这就是CAS中的ABA问题,即共享变量经历了 A → B → A 的更新。
  • 如果想要规避ABA问题,可以为共享变量引入一个修订好(或者叫时间戳),每次修改共享变量时,相应的修订号就会增加1,ABA问题的过程就转变为:[A,0] → [B,1] → [A,2] ,每次修改共享变量都会导致修订号的增加,通过修订号就可以准确的判断共享便是否被其他线程修改过。在原子类中,AtomicReference就会面临ABA问题的困扰,而AtomicStampedReference 可以很好的规避ABA问题。详细内容看下面的代码案例。

2.原子变量类

原子变量类基于CAS实现的,当对共享变量进行read-modify-write更新操作时,通过原子变量类可以保障操作的原子性与可见性。对变量的 read-modify-write 更新操作是指当前操作不是一个简单的赋值,而是变量的新值依赖变量的旧值,如自增操作i++。由于volatile只能保证可见性、无法保障原子性,原子变量类内部就是借助一个Volatile变量,并且保障了该变量的 read-modify-write 操作的原子性,有时把原子变量类看作增强的 volatile 变量。

原子变量类有 12 个,如:(这12个原子变量类全部位于JUC这个Java并发包的atomic子包下的)

分组原子变量类
基础数据型AtomicInteger, AtomicLong,AtomicBoolean
数组型AtomicIntegerArray,AtomicLongArray,AtomicReferenceArray
字段更新器AtomicIntegerFieldUpdater,AtomicLongFieldUpdater,AtomicReferenceFieldUpdater
引用型AtomicReference,AtomicStampedReference,AtomicMarkableReference

2.1AtomicInteger

package com.szh.volatiletest;

import java.util.concurrent.atomic.AtomicInteger;

/**
 * 使用原子类进行自增操作
 */
public class Test03 {

    public static void main(String[] args) throws InterruptedException {
        //在main线程中创建10个子线程
        for (int i = 0; i < 10; i++) {
            new MyThread().start();
        }
        Thread.sleep(1000);
        System.out.println(MyThread.count.get());
    }

    static class MyThread extends Thread {
        private static AtomicInteger count=new AtomicInteger();

        public static void addCount() {
            for (int i = 0; i < 1000; i++) {
                count.getAndIncrement();
            }
            System.out.println(Thread.currentThread().getName() + " count = " + count.get());
        }

        @Override
        public void run() {
            addCount();
        }
    }
}

首先是AtomicInteger,它其中的 getAndIncrement 方法就是 count++ 操作,get() 方法是获取最终的执行结果,可以看到10个子线程各自执行1000次 count++ 操作,最终的结果正确无误:10000。


2.2 AtomicLong

这里我是用 AtomicLong 定义一个简单的计数器,来模拟实现原子操作。

main主线程中一共创建了1000个子线程,每个线程都看作一个请求,所以请求总数肯定是1000,然后我模拟偶数就算请求成功、奇数就算请求失败。那么这个时候,因为Java中的多线程是采用抢占式来调度的,所以这里成功和失败的比值一般来说不会是1:1(各500次),输出结果中,我们也看到了是494、506,当然这个执行结果也不是唯一的。

package com.szh.atomiclong;

import java.util.concurrent.atomic.AtomicLong;

/**
 * 使用原子变量类定义一个计数器
 * 该计数器,在整个程序中都能使用,所以这个计数器可以设计为单例
 */
public class Indicator {
    //构造方法私有化
    private Indicator() {}
    //定义一个私有的本类静态对象
    private static final Indicator INSTANCE=new Indicator();
    //提供一个公共静态方法,获取该类唯一的实例
    public static Indicator getInstance() {
        return INSTANCE;
    }

    //使用原子变量类
    private final AtomicLong requestCount=new AtomicLong(0); //记录请求总数
    private final AtomicLong successCount=new AtomicLong(0); //记录成功总数
    private final AtomicLong failureCount=new AtomicLong(0); //记录失败总数

    //有新的请求
    public void newRequestReceive() {
        requestCount.getAndIncrement();
    }
    //处理成功
    public void requestReceiveSuccess() {
        successCount.getAndIncrement();
    }
    //处理失败
    public void requestReceiveFialure() {
        failureCount.getAndIncrement();
    }

    //查看请求总数
    public long getRequestCount() {
        return requestCount.get();
    }
    //查看成功总数
    public long getSuccessCount() {
        return successCount.get();
    }
    //查看失败总数
    public long getFailureCount() {
        return failureCount.get();
    }
}
package com.szh.atomiclong;

import java.util.Random;

/**
 *
 */
public class Test {

    public static void main(String[] args) {

        for (int i = 0; i < 1000; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    //每个线程就是一个请求,请求总数加1
                    Indicator.getInstance().newRequestReceive();
                    int num=new Random().nextInt();
                    if (num % 2 == 0) { //模拟偶数为请求成功
                        Indicator.getInstance().requestReceiveSuccess();
                    }else { //模拟奇数为请求失败
                        Indicator.getInstance().requestReceiveFialure();
                    }
                }
            }).start();
        }

        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        //打印请求数
        System.out.println(Indicator.getInstance().getRequestCount()); //请求总数
        System.out.println(Indicator.getInstance().getSuccessCount()); //成功总数
        System.out.println(Indicator.getInstance().getFailureCount()); //失败总数
    }
}


2.3 AtomicIntegerArray

案例1:AtomicIntegerArray中的常用方法。

package com.szh.atomicarray;

import java.util.concurrent.atomic.AtomicIntegerArray;

/**
 * AtomicIntegerArray 原子数组类的基本操作
 */
public class Test01 {

    public static void main(String[] args) {
        //创建一个指定长度的原子数组
        AtomicIntegerArray array=new AtomicIntegerArray(10);
        System.out.println(array);

        //返回指定位置的元素
        System.out.println(array.get(0));
        System.out.println(array.get(8));

        //设置指定位置的元素
        array.set(0,20);

        //在设置某个位置元素的新值时,返回该元素的旧值
        System.out.println(array.getAndSet(1,66));
        System.out.println(array);

        //将某个位置元素的值,加上一个数,返回该元素的新值
        System.out.println(array.addAndGet(0,30));
        System.out.println(array.addAndGet(1,4));
        System.out.println(array);

        //CAS操作
        //如果数组中索引为0的元素的值为50,则将其替换为新值1,返回true
        //反之,如果该索引的元素的值不是50,则不进行替换,返回false
        System.out.println(array.compareAndSet(0,50,1));
        System.out.println(array);

        //自增/自减
        System.out.println(array.incrementAndGet(0)); // ++i
        System.out.println(array.getAndIncrement(1)); // i++
        System.out.println(array.decrementAndGet(2)); // --i
        System.out.println(array.getAndDecrement(3)); // i--
        System.out.println(array);
    }
}

案例2:定义一个原子数组,一个线程数组,10个线程分别将该原子数组中的每个元素自增1000次,所以最终得到的原子数组中,每个元素的值应该为:10*1000=10000。

package com.szh.atomicarray;

import java.util.concurrent.atomic.AtomicIntegerArray;

/**
 * 在多线程中使用 AtomicIntegerArray 原子数组类
 */
public class Test02 {

    //定义一个原子数组
    static AtomicIntegerArray array=new AtomicIntegerArray(10);

    public static void main(String[] args) {
        //定义一个线程数组
        Thread[] threads=new Thread[10];
        for (int i = 0; i < threads.length; i++) {
            threads[i]=new AddThread();
        }
        //开启子线程
        for (Thread thread : threads) {
            thread.start();
        }
        //在主线程中查看自增之后的原子数组中每个元素的值,这里需要等待所有子线程执行完毕之后再查看
        //所以这里可以使用join方法进行线程合并,把所有的子线程合并到当前主线程中
        for (Thread thread : threads) {
            try {
                thread.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        //打印原子数组
        System.out.println(array);
    }

    //定义一个线程类,在这个类中修改原子数组
    static class AddThread extends Thread {
        @Override
        public void run() {
            //将原子数组中的每个元素自增1000次
            for (int j=0;j<1000;j++) {
                for (int i = 0; i < array.length(); i++) {
                    array.getAndIncrement(i % array.length());
                }
            }
        }
    }
}


2.4 AtomicIntegerFieldUpdater

AtomicIntegerFieldUpdater 可以对原子整数字段进行更新,要求:

  1. 字符必须使用 volatile 修饰,使线程之间可见。
  2. 只能是实例变量,不能是静态变量,也不能使用 final 修饰。

在这个Java Bean中,我将age属性设置为volatile修饰,也就是说此时这个age对所有的线程都是可见的。

package com.szh.atominintegerfiledupdater;

/**
 *
 */
public class User {

    int id;
    volatile int age;

    public User(int id, int age) {
        this.id = id;
        this.age = age;
    }

    @Override
    public String toString() {
        return "User{" +
                "id=" + id +
                ", age=" + age +
                '}';
    }
}
package com.szh.atominintegerfiledupdater;

import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

/**
 *
 */
public class SubThread extends Thread {

    //创建要更新的User对象
    private User user;

    //创建 AtomicIntegerFieldUpdater 更新器
    private AtomicIntegerFieldUpdater<User> updater=AtomicIntegerFieldUpdater.newUpdater(User.class,"age");

    public SubThread(User user) {
        this.user=user;
    }

    @Override
    public void run() {
        //在子线程中对User对象的age字段进行自增操作
        for (int i=0;i<10;i++) {
            System.out.println(updater.getAndIncrement(user));
        }
    }
}

上面这段代码,我定义了一个字段更新器,它的构造函数中传入两个值,第一个是User.class表示要更新User这个类(传入该类的字节码文件),第二个参数代表你要更新的字段名(必须是volatile修饰的)。

下面这段代码,main主线程中开启了10个子线程,每个线程执行自己的run方法时,都对age字段进行10次的自增操作,那么最终age字段一共会被自增10*10=100次,而age的初始值我们传入了20,最终age=20+100=120。

package com.szh.atominintegerfiledupdater;

/**
 *
 */
public class Test {

    public static void main(String[] args) {

        User user=new User(1001,20);
        //开启10个线程
        for (int i = 0; i < 10; i++) {
            new SubThread(user).start();
        }

        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println(user);
    }
}


2.5 AtomicReference

案例1:这个原子类属于引用型的,可以原子的读写一个对象。

package com.szh.atomicreference;

import java.util.concurrent.atomic.AtomicReference;

/**
 * 使用 AtomicReference 原子读写一个对象
 */
public class Test01 {

    //创建一个AtomicReference对象
    static AtomicReference<String> reference=new AtomicReference<>("abc");

    public static void main(String[] args) throws InterruptedException {
        //开启10个线程,来修改字符串
        for (int i = 0; i < 10; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    if (reference.compareAndSet("abc","def")) {
                        System.out.println(Thread.currentThread().getName() + " 把字符串abc被修改为了def");
                    }
                }
            }).start();
        }

        //开启10个线程,来修改字符串
        for (int i = 0; i < 10; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    if (reference.compareAndSet("def","abc")) {
                        System.out.println(Thread.currentThread().getName() + " 把字符串def还原为了abc");
                    }
                }
            }).start();
        }

        Thread.sleep(1000);
        System.out.println(reference.get());
    }
}

案例2:演示AtomicReference中的ABA问题。

package com.szh.atomicreference;

import java.util.concurrent.atomic.AtomicReference;

/**
 *
 */
public class Test02 {

    private static AtomicReference<String> reference=new AtomicReference<>("abc");

    public static void main(String[] args) throws InterruptedException {
        //创建一个线程,先把abc修改为def,再把def还原为abc
        Thread t1=new Thread(new Runnable() {
            @Override
            public void run() {
                reference.compareAndSet("abc","def");
                System.out.println(Thread.currentThread().getName() + " ---> " + reference.get());
                reference.compareAndSet("def","abc");
            }
        });

        Thread t2=new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(reference.compareAndSet("abc","xyz"));
            }
        });

        t1.start();
        t2.start();
        t1.join();
        t2.join();

        System.out.println(reference.get());
    }
}


2.6 AtomicStampedReference

这个原子类可以很好的规避CAS中的ABA问题。

package com.szh.atomicreference;

import java.util.concurrent.atomic.AtomicStampedReference;

/**
 * AtomicStampedReference 原子类可以解决 CAS 中的 ABA 问题
 * 在 AtomicStampedReference 原子类中有一个整数标记值 stamp,
 * 每次执行 CAS 操作时,需要对比它的版本,即比较 stamp 的值
 */
public class Test03 {

    //定义 AtomicStampedReference 引用操作"abc"字符串, 指定初始化版本号为 0
    private static AtomicStampedReference<String> reference=new AtomicStampedReference<>("abc",0);

    public static void main(String[] args) throws InterruptedException {
        Thread t1=new Thread(new Runnable() {
            @Override
            public void run() {
                reference.compareAndSet("abc","def",reference.getStamp(),reference.getStamp()+1);
                System.out.println(Thread.currentThread().getName() + " ---> " + reference.getReference());
                reference.compareAndSet("def","abc",reference.getStamp(),reference.getStamp()+1);
            }
        });

        Thread t2=new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(reference.compareAndSet("abc","xyz",reference.getStamp(), reference.getStamp()+1));
            }
        });

        t1.start();
        t2.start();
        t1.join();
        t2.join();

        System.out.println(reference.getReference());
    }
}

 

以上是关于Java——多线程高并发系列之理解CAS原子变量类的使用的主要内容,如果未能解决你的问题,请参考以下文章

高并发多线程安全之原子性问题CAS机制及问题解决方案

Java多线程系列---“JUC原子类”01之 原子类的实现(CAS算法)

Go并发编程之美-CAS操作

java基础---多线程---JUC原子类

多线程与并发Java中的12个原子操作类

Java多线程之原子操作类