高并发多线程安全之原子性问题CAS机制及问题解决方案
Posted 踩踩踩从踩
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了高并发多线程安全之原子性问题CAS机制及问题解决方案相关的知识,希望对你有一定的参考价值。
多线程编程 之java内存模型(JMM)与可见性问题
前言
在java内存模型中,对多线程间交互,涉及到原子性问题、可见性问题、以及有序性问题;
这篇文章主要讲解的是多线程高并发的原子性问题,以及解决原子性问题、CAS机制、自旋锁的优缺点、以及ABA问题等解决
什么是原子操作
定义
即一个操作或者多个操作,要么全部执行并且执行的过程不会被任何因素打断,要么就都不执行。
通过下面代码来看出来
public class Counter {
volatile int i = 0;
public void add() {
i = i + +;
}
}
public class Demo1_CounterTest {
public static void main(String[] args) throws InterruptedException {
final Counter ct = new Counter();
CountDownLatch cl = new CountDownLatch(6);
for (int i = 0; i < 6; i++) {
new Thread(new Runnable() {
@Override
public void run() {
for (int j = 0; j < 10000; j++) {
ct.add();
}
cl.countDown();
System.out.println("done...");
}
}).start();
}
cl.await();
System.out.println(ct.i);
}
}
上面得到的结果完全不可控,得到了下面的打印结果;明显产生了冲突,在预想的时候应该产生60000,但只得到了34684;多执行一次结果值又不一样
done...
done...
done...
done...
done...
done...
34684
产生情况的原因,为什么会出现值减少的情况
- i++不是原子操作,这在多线程操作时会出现非原子性行为
- 这是在每个线程中的私有方法内存中进行操作,就可能导致冲突,出现非原子性行为;但不是可见性问题,因此我们写入内存时,其他线程一定能看到,通过 volatile 已经发生happens-before原则
结论:
public synchronized void add() {
i = i + 1;
}
竞态条件与临界区
竞态条件:两个线程竞争同一个资源时,如果对资源的访问顺序敏感,就称存在竞态条件。
临界区:就是导致静态条件发生的代码区。 比如上图的 i++;代码
CAS(Compare and swap) 机制
public class CounterAtomic {
AtomicInteger at = new AtomicInteger(0);
public void add(){
at.getAndIncrement();
}
public int getValue(){
return at.get();
}
}
这里用到的AtomicInteger 就是利用的cas机制来达到原子性的
-
CAS 属于 硬件 同步原语,处理器提供的内存操作指令, 保证原子性 。
- CAS操作需要两个参数,一个旧值和目标值,修改前比较旧值是否改变,如果没变,将新值赋给变量,否则则不做改变。
如下图,去对比值,如果没变就赋值为111
如果两个线程同时进行操作,对于内存来说,其实还是分为两次操作,它保证同一时刻只能有一个进行修改
修改值之前进行检查,如果值是正常,就修改成功,修改失败就在更新老值进行重试。
CAS 属于硬件同步原语,处理器提供的内存操作指令,而jvm虚拟机提供给一个可以使用的类unsafe;
JAVA中的sun.misc.Unsafe类提供了CAS机制。
例如AtomicInteger中使用到的
// setup to use Unsafe.compareAndSwapInt for updates
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;
static {
try {
valueOffset = unsafe.objectFieldOffset
(AtomicInteger.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}
private volatile int value;
这个可以在自己仿造着AtomicInteger 写一个unsafe的原子操作类
static {
unsafe = Unsafe.getUnsafe();
try {
valueOffset = unsafe.objectFieldOffset(CountUnsafe.class.getDeclaredField("value"));
} catch (Exception ex) {
throw new Error(ex);
}
}
在写到上面代码时,出现了安全异常,而automicInteger中则没有异常,jdk不允许我们拿。然后就使用反射去解决
Exception in thread "main" java.lang.ExceptionInInitializerError
Caused by: java.lang.SecurityException: Unsafe
at sun.misc.Unsafe.getUnsafe(Unsafe.java:90)
at org.cao.learn.CountUnsafe.<clinit>(CountUnsafe.java:12)
- 按照automicinteger可以写一个,先定义属性
private static final Unsafe unsafe;
private static final long valueOffset;
private volatile int value = 0;
- 定义好静态方法块
static {
// unsafe = Unsafe.getUnsafe();
try {
Field field = Unsafe.class.getDeclaredField("theUnsafe");
field.setAccessible(true);
unsafe = (Unsafe) field.get(null);
valueOffset = unsafe.objectFieldOffset(CountUnsafe.class.getDeclaredField("value"));
} catch (Exception ex) {
throw new Error(ex);
}
}
- 然后总的代码
package org.cao.learn;
import java.lang.reflect.Field;
import sun.misc.Unsafe;
public class CountUnsafe {
private static final Unsafe unsafe;
private static final long valueOffset;
private volatile int value = 0;
static {
// unsafe = Unsafe.getUnsafe();
try {
Field field = Unsafe.class.getDeclaredField("theUnsafe");
field.setAccessible(true);
unsafe = (Unsafe) field.get(null);
valueOffset = unsafe.objectFieldOffset(CountUnsafe.class.getDeclaredField("value"));
} catch (Exception ex) {
throw new Error(ex);
}
}
public final int getAndIncrement() {
return unsafe.getAndAddInt(this, valueOffset, 1);
}
public void add() {
// i++;
for (;;) {
if (unsafe.compareAndSwapInt(this, valueOffset, value, value + 1)) {
return;
}
}
}
public int getValue() {
return value;
}
}
这里的getAndIncrement方法和add方法是一样的,最好是使用getAndAddInt 方法 这是native方法做了内循环,并做了优化的.
J.U.C包内的原子操作封装类
- AtomicBoolean:原子更新布尔类型
- AtomicInteger:原子更新整型
-
AtomicLong : 原子更新长整型
-
AtomicIntegerArray : 原子更新整型数组里的元素。
public AtomicIntegerArray(int[] array) {
// Visibility guaranteed by final field guarantees
this.array = array.clone();
}
底层存的int类型,需要对其中一个元素做原子性操作
public final int getAndSet(int i, int newValue) {
return unsafe.getAndSetInt(array, checkedByteOffset(i), newValue);
}
-
AtomicLongArray : 原子更新长整型数组里的元素。
- AtomicReferenceArray:原子更新引用类型数组里的元素。
- AtomicIntegerFieldUpdater:原子更新整型的字段的更新器。
这里做的引用就是修改对象的某个值,是它为原子操作。
ublic class Demo2_AtomicIntegerFieldUpdater {
// 新建AtomicIntegerFieldUpdater对象,需要指明是哪个类中的哪个字段
private static AtomicIntegerFieldUpdater<User> atom =
AtomicIntegerFieldUpdater.newUpdater(User.class, "id");
public static void main(String[] args) {
User user = new User(100, 100,"Kody");
atom.addAndGet(user, 50);
System.out.println("addAndGet(user, 50) 调用后值变为:" + user);
}
}
class User {
volatile int id;
volatile int age;
private String name;
public User(int id, int age, String name) {
this.id = id;
this.age = age;
this.name = name;
}
public String toString() {
return "id:" + id + " " + "age:" + age;
}
}
- AtomicLongFieldUpdater:原子更新长整型字段的更新器。
- AtomicReferenceFieldUpdater:原子更新引用类型里的字段
- AtomicReference:原子更新引用类型。
这里也是对比对象应用,包括下面的锁机制的一部分抢锁时,对比的是锁引用是否相等。
public boolean tryLock() {
//如果锁没有占用 存在原子性问题
return owner.compareAndSet(null, Thread.currentThread());
}
- AtomicStampedReference:原子更新带有版本号的引用类型
- AtomicMarkableReference:原子更新带有标记位的引用类型。
public long testLongAdder() throws InterruptedException {
LongAdder lacount = new LongAdder();
for (int i = 0; i < 3; i++) {
new Thread(() -> {
long starttime = System.currentTimeMillis();
while (System.currentTimeMillis() - starttime < 2000) { // 运行两秒
lacount.increment();
}
long endtime = System.currentTimeMillis();
}).start();
}
Thread.sleep(3000);
return lacount.sum();
}
public class Demo3_LongAccumulator {
public static void main(String[] args) throws InterruptedException {
LongAccumulator accumulator = new LongAccumulator(
(x,y)->{
System.out.println("x:" + x);
System.out.println("y:" + y);
return x+y;
},
0L);
for (int i = 0; i < 3; i++) {
accumulator.accumulate(1);
}
System.out.println(accumulator.get());
}
}
CAS存在的问题
- 仅针对单个变量的操作,不能用于多个变量来实现原子操作
- 循环+CAS,自旋的实现让所有线程都处于高频运行,争抢CPU执行时间的状态。如果操作长时间不成功,会消耗大量的cpu资源。
- ABA问题,无法体现出数据的变动。
主要是多线程操作时,其中一个线程操作过后,得到的新值
上述的方式,在automicinteger里面不出问题,在AtomicReference中可能出现问题,也是链表结构的情况
public class Stack {
// top cas无锁修改
AtomicReference<Node> top = new AtomicReference<Node>();
public void push(Node node) { // 入栈
Node oldTop;
do {
oldTop = top.get();
node.next = oldTop;
}
while (!top.compareAndSet(oldTop, node)); // CAS 替换栈顶
}
// 出栈
public Node pop(int time) {
Node newTop;
Node oldTop;
do {
oldTop = top.get();
if (oldTop == null) { //如果没有值,就返回null
return null;
}
newTop = oldTop.next;
if (time != 0) {
LockSupport.parkNanos(1000 * 1000 * time);
}
}
while (!top.compareAndSet(oldTop, newTop)); //将下一个节点设置为top
return oldTop; //将旧的Top作为值返回
}
}
测试方法
public static void main(String[] args) throws InterruptedException {
Stack stack = new Stack();
//ConcurrentStack stack = new ConcurrentStack();
stack.push(new Node("B"));
stack.push(new Node("A"));
Thread thread1 = new Thread(() -> {
Node node = stack.pop(800);
System.out.println(Thread.currentThread().getName() +" "+ node.toString());
System.out.println("done...");
});
thread1.start();
Thread thread2 = new Thread(() -> {
LockSupport.parkNanos(1000 * 1000 * 300L);
Node nodeA = stack.pop(0);
System.out.println(Thread.currentThread().getName() +" "+ nodeA.toString());
Node nodeB = stack.pop(0); //取出B,之后B处于游离状态
System.out.println(Thread.currentThread().getName() +" "+ nodeB.toString());
stack.push(new Node("D"));
stack.push(new Node("C"));
stack.push(nodeA);
System.out.println("done...");
});
thread2.start();
LockSupport.parkNanos(1000 * 1000 * 1000 * 2L);
System.out.println("开始遍历Stack:");
Node node = null;
while ((node = stack.pop(0))!=null){
System.out.println(node.value);
}
}
线程1在拿到 老的top值,和新的node值时过后,进行cas比较并赋值的情况等待状态了;然后线程2中间插入了很多数据,又插回了原来那个对象,因此出现了ABA问题。线程2put进去的数据都变不在了,其实里面属性已经变了
解决ABA问题的办法
主要是我们不能将top引用作为对比的对象,如果按照引用去对比,属性修改,数据肯定不正确了。
AtomicStampedReference<Node> top =
new AtomicStampedReference<>(null, 0);
AtomicStampedReference添加了版本号,不只是对比引用,加入了版本号维护;
package org.cao.learn;
import java.util.concurrent.atomic.AtomicStampedReference;
import java.util.concurrent.locks.LockSupport;
// 实现一个 栈(后进先出)
public class Stack {
// top cas无锁修改
AtomicStampedReference<Node> top = new AtomicStampedReference<Node>(null, 0);
// AtomicReference<Node> top = new AtomicReference<Node>();
public void push(Node node) { // 入栈
Node oldTop;
int v = 0;
do {
v = top.getStamp();
oldTop = top.getReference();
node.next = oldTop;
} while (!top.compareAndSet(oldTop, node, v, v + 1)); // CAS 替换栈顶
}
// 出栈 -- 取出栈顶 ,为了演示ABA效果, 增加一个CAS操作的延时
public Node pop(int time) {
Node newTop;
Node oldTop;
int v = 0;
do {
oldTop = top.getReference();
v = top.getStamp();
if (oldTop == null) { // 如果没有值,就返回null
return null;
}
newTop = oldTop.next;
if (time != 0) { // 模拟延时
LockSupport.parkNanos(1000 * 1000 * time);
}
} while (!top.compareAndSet(oldTop, newTop, v, v + 1));
return oldTop;
}
}
以上是关于高并发多线程安全之原子性问题CAS机制及问题解决方案的主要内容,如果未能解决你的问题,请参考以下文章