Java——多线程高并发系列之理解CAS原子变量类的使用
Posted 张起灵-小哥
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java——多线程高并发系列之理解CAS原子变量类的使用相关的知识,希望对你有一定的参考价值。
文章目录:
1.CAS
CAS(Compare And Swap)是由硬件实现的。CAS 可以将 read- modify - write 这类的操作转换为原子操作。
i++自增操作包括三个子操作: 从主内存读取 i 变量值;对 i 的值加 1;再把加 1 之后 的值保存到主内存。
CAS 原理: 在把数据更新到主内存时,再次读取主内存变量的值,如果现在变量的值与期望的值(操作起始时读取的值)一样就更新。
下面我说一下,我对上面这张图中CAS算法的理解: 首先在主内存有一个所有线程都共享的count变量,它的值初始化为10。
- 线程1从主内存中读取count变量的值到自己的工作内存中,此时count为10。(这个值我们记为:线程1期望的count值、最初读取到的count值)
- 线程1对自己工作内存中的count的值进行自增操作,此时count为11。(这个值我们记为:线程1更新之后的count值)
- 这个时候,当线程1还未执行到第三步更新count的值到主内存中的时候,线程2突然抢到了CPU执行权,它进来从主内存中读取count变量的值到自己的工作内存中,因为线程1还未更新,所以此时count仍为10。(这个值我们记为:线程2期望的count值、最初读取到的count值)
- 线程2对自己工作内存中的count的值进行自增操作,此时count为11。(这个值我们记为:线程2更新之后的count值)
- 在线程2要将自己工作内存中的count值更新到主内存之前,要执行一部重要的操作!!!就是再次读取主内存中共享变量count的值,如果此时读取到的这个值与最开始线程2期望的count值相等,那么就顺利更新count的值;如果不相等,就撤销放弃本次count++操作。
- 那么线程2最开始期望的count值为10,而此时从主内存中读取到的count值还是10,一样,所以就更新,将自己工作内存中的count值更新到主内存中,此时count为11。
- 那么线程2执行完了,此时线程1重新获得CPU执行权,它已经完成了count++的前两步了,就剩最后一步更新操作了,那么它此时同线程2一样,再次从主内存中读取共享变量count的值,一样就更新、不一样就撤销放弃操作。然而count的值已经被线程2更新过了,此时为11,而线程1最开始期望的count的值为10,两次读取到count的值不一样了!!!所以线程1就会自动撤销本次count++操作。
- 最终,本次线程1、2对count变量执行++操作的结果:线程1失败、线程2成功,最后count的值只增加了1次,为11。
下面,我再来说一下什么是CAS中的ABA问题。
- CAS实现原子操作背后有一个假设:共享变量的当前值与当前线程提供的期望值相同,就认为这个变量没有被其他线程修改过。
- 实际上这种假设不一定总是成立的。假如说有共享变量 count=100。
- A线程将 count 修改为 200
- B线程将 count 修改为 300
- C线程将 count 修改为 100
- 当前线程看到的count变量的值是100,这就与最初的count变量的值是一样的,那么是否就认为count变量的值没有被其他线程更新呢?显然不是啊,它明显的被A、B两个线程更新过。
- 这就是CAS中的ABA问题,即共享变量经历了 A → B → A 的更新。
- 如果想要规避ABA问题,可以为共享变量引入一个修订好(或者叫时间戳),每次修改共享变量时,相应的修订号就会增加1,ABA问题的过程就转变为:[A,0] → [B,1] → [A,2] ,每次修改共享变量都会导致修订号的增加,通过修订号就可以准确的判断共享便是否被其他线程修改过。在原子类中,AtomicReference就会面临ABA问题的困扰,而AtomicStampedReference 可以很好的规避ABA问题。详细内容看下面的代码案例。
2.原子变量类
原子变量类基于CAS实现的,当对共享变量进行read-modify-write更新操作时,通过原子变量类可以保障操作的原子性与可见性。对变量的 read-modify-write 更新操作是指当前操作不是一个简单的赋值,而是变量的新值依赖变量的旧值,如自增操作i++。由于volatile只能保证可见性、无法保障原子性,原子变量类内部就是借助一个Volatile变量,并且保障了该变量的 read-modify-write 操作的原子性,有时把原子变量类看作增强的 volatile 变量。
原子变量类有 12 个,如:(这12个原子变量类全部位于JUC这个Java并发包的atomic子包下的)
分组 | 原子变量类 |
基础数据型 | AtomicInteger, AtomicLong,AtomicBoolean |
数组型 | AtomicIntegerArray,AtomicLongArray,AtomicReferenceArray |
字段更新器 | AtomicIntegerFieldUpdater,AtomicLongFieldUpdater,AtomicReferenceFieldUpdater |
引用型 | AtomicReference,AtomicStampedReference,AtomicMarkableReference |
2.1AtomicInteger
package com.szh.volatiletest;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 使用原子类进行自增操作
*/
public class Test03 {
public static void main(String[] args) throws InterruptedException {
//在main线程中创建10个子线程
for (int i = 0; i < 10; i++) {
new MyThread().start();
}
Thread.sleep(1000);
System.out.println(MyThread.count.get());
}
static class MyThread extends Thread {
private static AtomicInteger count=new AtomicInteger();
public static void addCount() {
for (int i = 0; i < 1000; i++) {
count.getAndIncrement();
}
System.out.println(Thread.currentThread().getName() + " count = " + count.get());
}
@Override
public void run() {
addCount();
}
}
}
首先是AtomicInteger,它其中的 getAndIncrement 方法就是 count++ 操作,get() 方法是获取最终的执行结果,可以看到10个子线程各自执行1000次 count++ 操作,最终的结果正确无误:10000。
2.2 AtomicLong
这里我是用 AtomicLong 定义一个简单的计数器,来模拟实现原子操作。
main主线程中一共创建了1000个子线程,每个线程都看作一个请求,所以请求总数肯定是1000,然后我模拟偶数就算请求成功、奇数就算请求失败。那么这个时候,因为Java中的多线程是采用抢占式来调度的,所以这里成功和失败的比值一般来说不会是1:1(各500次),输出结果中,我们也看到了是494、506,当然这个执行结果也不是唯一的。
package com.szh.atomiclong;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 使用原子变量类定义一个计数器
* 该计数器,在整个程序中都能使用,所以这个计数器可以设计为单例
*/
public class Indicator {
//构造方法私有化
private Indicator() {}
//定义一个私有的本类静态对象
private static final Indicator INSTANCE=new Indicator();
//提供一个公共静态方法,获取该类唯一的实例
public static Indicator getInstance() {
return INSTANCE;
}
//使用原子变量类
private final AtomicInteger requestCount=new AtomicInteger(0); //记录请求总数
private final AtomicInteger successCount=new AtomicInteger(0); //记录成功总数
private final AtomicInteger failureCount=new AtomicInteger(0); //记录失败总数
//有新的请求
public void newRequestReceive() {
requestCount.getAndIncrement();
}
//处理成功
public void requestReceiveSuccess() {
successCount.getAndIncrement();
}
//处理失败
public void requestReceiveFialure() {
failureCount.getAndIncrement();
}
//查看请求总数
public long getRequestCount() {
return requestCount.get();
}
//查看成功总数
public long getSuccessCount() {
return successCount.get();
}
//查看失败总数
public long getFailureCount() {
return failureCount.get();
}
}
package com.szh.atomiclong;
import java.util.Random;
/**
*
*/
public class Test {
public static void main(String[] args) {
for (int i = 0; i < 1000; i++) {
new Thread(new Runnable() {
@Override
public void run() {
//每个线程就是一个请求,请求总数加1
Indicator.getInstance().newRequestReceive();
int num=new Random().nextInt();
if (num % 2 == 0) { //模拟偶数为请求成功
Indicator.getInstance().requestReceiveSuccess();
}else { //模拟奇数为请求失败
Indicator.getInstance().requestReceiveFialure();
}
}
}).start();
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//打印请求数
System.out.println(Indicator.getInstance().getRequestCount()); //请求总数
System.out.println(Indicator.getInstance().getSuccessCount()); //成功总数
System.out.println(Indicator.getInstance().getFailureCount()); //失败总数
}
}
2.3 AtomicIntegerArray
案例1:AtomicIntegerArray中的常用方法。
package com.szh.atomicarray;
import java.util.concurrent.atomic.AtomicIntegerArray;
/**
* AtomicIntegerArray 原子数组类的基本操作
*/
public class Test01 {
public static void main(String[] args) {
//创建一个指定长度的原子数组
AtomicIntegerArray array=new AtomicIntegerArray(10);
System.out.println(array);
//返回指定位置的元素
System.out.println(array.get(0));
System.out.println(array.get(8));
//设置指定位置的元素
array.set(0,20);
//在设置某个位置元素的新值时,返回该元素的旧值
System.out.println(array.getAndSet(1,66));
System.out.println(array);
//将某个位置元素的值,加上一个数,返回该元素的新值
System.out.println(array.addAndGet(0,30));
System.out.println(array.addAndGet(1,4));
System.out.println(array);
//CAS操作
//如果数组中索引为0的元素的值为50,则将其替换为新值1,返回true
//反之,如果该索引的元素的值不是50,则不进行替换,返回false
System.out.println(array.compareAndSet(0,50,1));
System.out.println(array);
//自增/自减
System.out.println(array.incrementAndGet(0)); // ++i
System.out.println(array.getAndIncrement(1)); // i++
System.out.println(array.decrementAndGet(2)); // --i
System.out.println(array.getAndDecrement(3)); // i--
System.out.println(array);
}
}
案例2:定义一个原子数组,一个线程数组,10个线程分别将该原子数组中的每个元素自增1000次,所以最终得到的原子数组中,每个元素的值应该为:10*1000=10000。
package com.szh.atomicarray;
import java.util.concurrent.atomic.AtomicIntegerArray;
/**
* 在多线程中使用 AtomicIntegerArray 原子数组类
*/
public class Test02 {
//定义一个原子数组
static AtomicIntegerArray array=new AtomicIntegerArray(10);
public static void main(String[] args) {
//定义一个线程数组
Thread[] threads=new Thread[10];
for (int i = 0; i < threads.length; i++) {
threads[i]=new AddThread();
}
//开启子线程
for (Thread thread : threads) {
thread.start();
}
//在主线程中查看自增之后的原子数组中每个元素的值,这里需要等待所有子线程执行完毕之后再查看
//所以这里可以使用join方法进行线程合并,把所有的子线程合并到当前主线程中
for (Thread thread : threads) {
try {
thread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//打印原子数组
System.out.println(array);
}
//定义一个线程类,在这个类中修改原子数组
static class AddThread extends Thread {
@Override
public void run() {
//将原子数组中的每个元素自增1000次
for (int j=0;j<1000;j++) {
for (int i = 0; i < array.length(); i++) {
array.getAndIncrement(i % array.length());
}
}
}
}
}
2.4 AtomicIntegerFieldUpdater
AtomicIntegerFieldUpdater 可以对原子整数字段进行更新,要求:
- 字符必须使用 volatile 修饰,使线程之间可见。
- 只能是实例变量,不能是静态变量,也不能使用 final 修饰。
在这个Java Bean中,我将age属性设置为volatile修饰,也就是说此时这个age对所有的线程都是可见的。
package com.szh.atominintegerfiledupdater;
/**
*
*/
public class User {
int id;
volatile int age;
public User(int id, int age) {
this.id = id;
this.age = age;
}
@Override
public String toString() {
return "User{" +
"id=" + id +
", age=" + age +
'}';
}
}
package com.szh.atominintegerfiledupdater;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
/**
*
*/
public class SubThread extends Thread {
//创建要更新的User对象
private User user;
//创建 AtomicIntegerFieldUpdater 更新器
private AtomicIntegerFieldUpdater<User> updater=AtomicIntegerFieldUpdater.newUpdater(User.class,"age");
public SubThread(User user) {
this.user=user;
}
@Override
public void run() {
//在子线程中对User对象的age字段进行自增操作
for (int i=0;i<10;i++) {
System.out.println(updater.getAndIncrement(user));
}
}
}
上面这段代码,我定义了一个字段更新器,它的构造函数中传入两个值,第一个是User.class表示要更新User这个类(传入该类的字节码文件),第二个参数代表你要更新的字段名(必须是volatile修饰的)。
下面这段代码,main主线程中开启了10个子线程,每个线程执行自己的run方法时,都对age字段进行10次的自增操作,那么最终age字段一共会被自增10*10=100次,而age的初始值我们传入了20,最终age=20+100=120。
package com.szh.atominintegerfiledupdater;
/**
*
*/
public class Test {
public static void main(String[] args) {
User user=new User(1001,20);
//开启10个线程
for (int i = 0; i < 10; i++) {
new SubThread(user).start();
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(user);
}
}
2.5 AtomicReference
案例1:这个原子类属于引用型的,可以原子的读写一个对象。
package com.szh.atomicreference;
import java.util.concurrent.atomic.AtomicReference;
/**
* 使用 AtomicReference 原子读写一个对象
*/
public class Test01 {
//创建一个AtomicReference对象
static AtomicReference<String> reference=new AtomicReference<>("abc");
public static void main(String[] args) throws InterruptedException {
//开启10个线程,来修改字符串
for (int i = 0; i < 10; i++) {
new Thread(new Runnable() {
@Override
public void run() {
if (reference.compareAndSet("abc","def")) {
System.out.println(Thread.currentThread().getName() + " 把字符串abc被修改为了def");
}
}
}).start();
}
//开启10个线程,来修改字符串
for (int i = 0; i < 10; i++) {
new Thread(new Runnable() {
@Override
public void run() {
if (reference.compareAndSet("def","abc")) {
System.out.println(Thread.currentThread().getName() + " 把字符串def还原为了abc");
}
}
}).start();
}
Thread.sleep(1000);
System.out.println(reference.get());
}
}
案例2:演示AtomicReference中的ABA问题。
package com.szh.atomicreference;
import java.util.concurrent.atomic.AtomicReference;
/**
*
*/
public class Test02 {
private static AtomicReference<String> reference=new AtomicReference<>("abc");
public static void main(String[] args) throws InterruptedException {
//创建一个线程,先把abc修改为def,再把def还原为abc
Thread t1=new Thread(new Runnable() {
@Override
public void run() {
reference.compareAndSet("abc","def");
System.out.println(Thread.currentThread().getName() + " ---> " + reference.get());
reference.compareAndSet("def","abc");
}
});
Thread t2=new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(reference.compareAndSet("abc","xyz"));
}
});
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println(reference.get());
}
}
2.6 AtomicStampedReference
这个原子类可以很好的规避CAS中的ABA问题。
package com.szh.atomicreference;
import java.util.concurrent.atomic.AtomicStampedReference;
/**
* AtomicStampedReference 原子类可以解决 CAS 中的 ABA 问题
* 在 AtomicStampedReference 原子类中有一个整数标记值 stamp,
* 每次执行 CAS 操作时,需要对比它的版本,即比较 stamp 的值
*/
public class Test03 {
//定义 AtomicStampedReference 引用操作"abc"字符串, 指定初始化版本号为 0
private static AtomicStampedReference<String> reference=new AtomicStampedReference<>("abc",0);
public static void main(String[] args) throws InterruptedException {
Thread t1=new Thread(new Runnable() {
@Override
public void run() {
reference.compareAndSet("abc","def",reference.getStamp(),reference.getStamp()+1);
System.out.println(Thread.currentThread().getName() + " ---> " + reference.getReference());
reference.compareAndSet("def","abc",reference.getStamp(),reference.getStamp()+1);
}
});
Thread t2=new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(reference.compareAndSet("abc","xyz",reference.getStamp(), reference.getStamp()+1));
}
});
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println(reference.getReference());
}
}
以上是关于Java——多线程高并发系列之理解CAS原子变量类的使用的主要内容,如果未能解决你的问题,请参考以下文章