高并发多线程安全之原子性问题CAS机制及问题解决方案

Posted 踩踩踩从踩

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了高并发多线程安全之原子性问题CAS机制及问题解决方案相关的知识,希望对你有一定的参考价值。

多线程编程 之java内存模型(JMM)与可见性问题

前言

在java内存模型中,对多线程间交互,涉及到原子性问题、可见性问题、以及有序性问题;

这篇文章主要讲解的是多线程高并发的原子性问题,以及解决原子性问题、CAS机制、自旋锁的优缺点、以及ABA问题等解决

什么是原子操作

定义

即一个操作或者多个操作,要么全部执行并且执行的过程不会被任何因素打断,要么就都不执行。

通过下面代码来看出来

public class Counter {
    volatile int i = 0;

    public void add() {
        i = i + +;
    }
}
public class Demo1_CounterTest {

	public static void main(String[] args) throws InterruptedException {
		final Counter ct = new Counter();
		CountDownLatch cl = new CountDownLatch(6);
		for (int i = 0; i < 6; i++) {
			new Thread(new Runnable() {
				@Override
				public void run() {
					for (int j = 0; j < 10000; j++) {
						ct.add();
					}
					cl.countDown();
					System.out.println("done...");
				}
			}).start();
		}
		cl.await();
		System.out.println(ct.i);
	}
}

上面得到的结果完全不可控,得到了下面的打印结果;明显产生了冲突,在预想的时候应该产生60000,但只得到了34684;多执行一次结果值又不一样

done...
done...
done...
done...
done...
done...
34684

产生情况的原因,为什么会出现值减少的情况

  • i++不是原子操作,这在多线程操作时会出现非原子性行为

 

  •  这是在每个线程中的私有方法内存中进行操作,就可能导致冲突,出现非原子性行为;但不是可见性问题,因此我们写入内存时,其他线程一定能看到,通过 volatile 已经发生happens-before原则

原子操作可以是一个步骤,也可以是多个操作步骤,但是其顺序不可以被打乱,也 不可以被切割 而只执行其中的一部分(不可中断性)。
将整个操作视作一个整体, 资源在该次操作中保持一致 ,这是原子性的核心特征

结论:

i++不是原子操作,存在竞态条件,线程不安全,需要转变为原子操作才能安全。要保证数据安全
public synchronized void add() {
		i = i + 1;
	}

竞态条件与临界区

 竞态条件:两个线程竞争同一个资源时,如果对资源的访问顺序敏感,就称存在竞态条件。

临界区:就是导致静态条件发生的代码区。 比如上图的 i++;代码

CAS(Compare and swap) 机制

public class CounterAtomic {
    AtomicInteger at = new AtomicInteger(0);

    public void add(){
        at.getAndIncrement();
    }

    public int getValue(){
        return at.get();
    }
}

这里用到的AtomicInteger  就是利用的cas机制来达到原子性的

  • CAS 属于 硬件 同步原语,处理器提供的内存操作指令, 保证原子性
  • CAS操作需要两个参数,一个旧值和目标值,修改前比较旧值是否改变,如果没变,将新值赋给变量,否则则不做改变。

如下图,去对比值,如果没变就赋值为111

 如果两个线程同时进行操作,对于内存来说,其实还是分为两次操作,它保证同一时刻只能有一个进行修改

修改值之前进行检查,如果值是正常,就修改成功,修改失败就在更新老值进行重试。

 CAS 属于硬件同步原语,处理器提供的内存操作指令,而jvm虚拟机提供给一个可以使用的类unsafe;

JAVA中的sun.misc.Unsafe类提供了CAS机制。

例如AtomicInteger中使用到的

 // setup to use Unsafe.compareAndSwapInt for updates
    private static final Unsafe unsafe = Unsafe.getUnsafe();
    private static final long valueOffset;

    static {
        try {
            valueOffset = unsafe.objectFieldOffset
                (AtomicInteger.class.getDeclaredField("value"));
        } catch (Exception ex) { throw new Error(ex); }
    }

    private volatile int value;

这个可以在自己仿造着AtomicInteger 写一个unsafe的原子操作类

	static {
		unsafe = Unsafe.getUnsafe();
		try {
			valueOffset = unsafe.objectFieldOffset(CountUnsafe.class.getDeclaredField("value"));
		} catch (Exception ex) {
			throw new Error(ex);
		}
	}

 在写到上面代码时,出现了安全异常,而automicInteger中则没有异常,jdk不允许我们拿。然后就使用反射去解决

Exception in thread "main" java.lang.ExceptionInInitializerError
Caused by: java.lang.SecurityException: Unsafe
	at sun.misc.Unsafe.getUnsafe(Unsafe.java:90)
	at org.cao.learn.CountUnsafe.<clinit>(CountUnsafe.java:12)
  •  按照automicinteger可以写一个,先定义属性
private static final Unsafe unsafe;
	private static final long valueOffset;

	private volatile int value = 0;
  • 定义好静态方法块
static {
//		unsafe = Unsafe.getUnsafe();

		try {
			Field field = Unsafe.class.getDeclaredField("theUnsafe");
			field.setAccessible(true);
			unsafe = (Unsafe) field.get(null);
			valueOffset = unsafe.objectFieldOffset(CountUnsafe.class.getDeclaredField("value"));
		} catch (Exception ex) {
			throw new Error(ex);
		}
	}
  • 然后总的代码  
package org.cao.learn;

import java.lang.reflect.Field;

import sun.misc.Unsafe;

public class CountUnsafe {
	private static final Unsafe unsafe;
	private static final long valueOffset;

	private volatile int value = 0;

	static {
//		unsafe = Unsafe.getUnsafe();

		try {
			Field field = Unsafe.class.getDeclaredField("theUnsafe");
			field.setAccessible(true);
			unsafe = (Unsafe) field.get(null);
			valueOffset = unsafe.objectFieldOffset(CountUnsafe.class.getDeclaredField("value"));
		} catch (Exception ex) {
			throw new Error(ex);
		}
	}

	public final int getAndIncrement() {
		return unsafe.getAndAddInt(this, valueOffset, 1);
	}

	public void add() {
		// i++; 
		for (;;) {
			if (unsafe.compareAndSwapInt(this, valueOffset, value, value + 1)) {
				return;
			}
		}
	}

	public int getValue() {
		return value;
	}

}

这里的getAndIncrement方法和add方法是一样的,最好是使用getAndAddInt 方法 这是native方法做了内循环,并做了优化的.

J.U.C包内的原子操作封装类

  • AtomicBoolean原子更新布尔类型
  • AtomicInteger原子更新整型
  • AtomicLong 原子更新长整型
  • AtomicIntegerArray 原子更新整型数组里的元素。
  public AtomicIntegerArray(int[] array) {
        // Visibility guaranteed by final field guarantees
        this.array = array.clone();
    }

底层存的int类型,需要对其中一个元素做原子性操作

    public final int getAndSet(int i, int newValue) {
        return unsafe.getAndSetInt(array, checkedByteOffset(i), newValue);
    }
  • AtomicLongArray 原子更新长整型数组里的元素。
  • AtomicReferenceArray原子更新引用类型数组里的元素。
  • AtomicIntegerFieldUpdater原子更新整型的字段的更新器。

这里做的引用就是修改对象的某个值,是它为原子操作。

ublic class Demo2_AtomicIntegerFieldUpdater {
    // 新建AtomicIntegerFieldUpdater对象,需要指明是哪个类中的哪个字段


    private static AtomicIntegerFieldUpdater<User> atom =
            AtomicIntegerFieldUpdater.newUpdater(User.class, "id");

    public static void main(String[] args) {

        User user = new User(100, 100,"Kody");

        atom.addAndGet(user, 50);
        System.out.println("addAndGet(user, 50)             调用后值变为:" + user);

    }
}

class User {
    volatile int id;
    volatile int age;

    private String name;

    public User(int id, int age, String name) {
        this.id = id;
        this.age = age;
        this.name = name;
    }

    public String toString() {
        return "id:" + id + " " + "age:" + age;
    }
}
  • AtomicLongFieldUpdater原子更新长整型字段的更新器。
  • AtomicReferenceFieldUpdater原子更新引用类型里的字段
  • AtomicReference原子更新引用类型。

       这里也是对比对象应用,包括下面的锁机制的一部分抢锁时,对比的是锁引用是否相等。

 public boolean tryLock() {
        //如果锁没有占用     存在原子性问题
    
        return owner.compareAndSet(null, Thread.currentThread());
    }
  • AtomicStampedReference原子更新带有版本号的引用类型
  • AtomicMarkableReference原子更新带有标记位的引用类型。
1.8更新
计数器: DoubleAdder LongAdder
计数器增强版,高并发下性能更好 ,多个道路进行累加,然后进行相加
public long testLongAdder() throws InterruptedException {
        LongAdder lacount = new LongAdder();
        for (int i = 0; i < 3; i++) {
            new Thread(() -> {
                long starttime = System.currentTimeMillis();
                while (System.currentTimeMillis() - starttime < 2000) { // 运行两秒
                    lacount.increment();
                }
                long endtime = System.currentTimeMillis();
            }).start();
        }
        Thread.sleep(3000);
        return lacount.sum();
    }

DoubleAccumulator、 LongAccumulator
是计数器的增强版,可自定义累加规则   可以指定逻辑规则, 做一个增强版操作。自定义规则,x和y值 其中0为x ,y为每次规则过后结果
public class Demo3_LongAccumulator {
    public static void main(String[] args) throws InterruptedException {

        LongAccumulator accumulator = new LongAccumulator(
                (x,y)->{
                    System.out.println("x:" + x);
                    System.out.println("y:" + y);
                    return  x+y;
                },
                0L);

        for (int i = 0; i < 3; i++) {
            accumulator.accumulate(1);
        }

        System.out.println(accumulator.get());

    }
}

CAS存在的问题

  • 仅针对单个变量的操作,不能用于多个变量来实现原子操作
  • 循环+CAS,自旋的实现让所有线程都处于高频运行,争抢CPU执行时间的状态。如果操作长时间不成功,会消耗大量的cpu资源。

  • ABA问题,无法体现出数据的变动。

主要是多线程操作时,其中一个线程操作过后,得到的新值

上述的方式,在automicinteger里面不出问题,在AtomicReference中可能出现问题,也是链表结构的情况

public class Stack {
    // top cas无锁修改
    AtomicReference<Node> top = new AtomicReference<Node>();

    public void push(Node node) { // 入栈
        Node oldTop;
        do {
            oldTop = top.get();
            node.next = oldTop;
        }
        while (!top.compareAndSet(oldTop, node)); // CAS 替换栈顶
    }


    // 出栈
    public Node pop(int time) {

        Node newTop;
        Node oldTop;
        do {
            oldTop = top.get();
            if (oldTop == null) {   //如果没有值,就返回null
                return null;
            }
            newTop = oldTop.next;
            if (time != 0) {    
                LockSupport.parkNanos(1000 * 1000 * time); 
            }
        }
        while (!top.compareAndSet(oldTop, newTop));     //将下一个节点设置为top
        return oldTop;      //将旧的Top作为值返回
    }
}

 测试方法

 public static void main(String[] args) throws InterruptedException {
        Stack stack = new Stack();
        //ConcurrentStack stack = new ConcurrentStack();

        stack.push(new Node("B"));      
        stack.push(new Node("A"));      

        Thread thread1 = new Thread(() -> {
            Node node = stack.pop(800);
            System.out.println(Thread.currentThread().getName() +" "+ node.toString());

            System.out.println("done...");
        });
        thread1.start();

        Thread thread2 = new Thread(() -> {
            LockSupport.parkNanos(1000 * 1000 * 300L);

            Node nodeA = stack.pop(0);      
            System.out.println(Thread.currentThread().getName()  +" "+  nodeA.toString());

            Node nodeB = stack.pop(0);      //取出B,之后B处于游离状态
            System.out.println(Thread.currentThread().getName()  +" "+  nodeB.toString());

            stack.push(new Node("D"));      
            stack.push(new Node("C"));      
            stack.push(nodeA);                    

            System.out.println("done...");
        });
        thread2.start();

        LockSupport.parkNanos(1000 * 1000 * 1000 * 2L);


        System.out.println("开始遍历Stack:");
        Node node = null;
        while ((node = stack.pop(0))!=null){
            System.out.println(node.value);
        }
    }

 

线程1在拿到 老的top值,和新的node值时过后,进行cas比较并赋值的情况等待状态了;然后线程2中间插入了很多数据,又插回了原来那个对象,因此出现了ABA问题。线程2put进去的数据都变不在了,其实里面属性已经变了

解决ABA问题的办法

主要是我们不能将top引用作为对比的对象,如果按照引用去对比,属性修改,数据肯定不正确了。

AtomicStampedReference<Node> top =
            new AtomicStampedReference<>(null, 0);

AtomicStampedReference添加了版本号,不只是对比引用,加入了版本号维护;

package org.cao.learn;

import java.util.concurrent.atomic.AtomicStampedReference;
import java.util.concurrent.locks.LockSupport;

// 实现一个 栈(后进先出)
public class Stack {
	// top cas无锁修改
	AtomicStampedReference<Node> top = new AtomicStampedReference<Node>(null, 0);
//	AtomicReference<Node> top = new AtomicReference<Node>();

	public void push(Node node) { // 入栈
		Node oldTop;
		int v = 0;
		do {
			v = top.getStamp();
			oldTop = top.getReference();
			node.next = oldTop;
		} while (!top.compareAndSet(oldTop, node, v, v + 1)); // CAS 替换栈顶
	}

	// 出栈 -- 取出栈顶 ,为了演示ABA效果, 增加一个CAS操作的延时
	public Node pop(int time) {

		Node newTop;
		Node oldTop;
		int v = 0;
		do {
			oldTop = top.getReference();
			v = top.getStamp();
			if (oldTop == null) { // 如果没有值,就返回null
				return null;
			}
			newTop = oldTop.next;
			if (time != 0) { // 模拟延时
				LockSupport.parkNanos(1000 * 1000 * time); 
			}
		} while (!top.compareAndSet(oldTop, newTop, v, v + 1)); 
		return oldTop; 
	}
}

以上是关于高并发多线程安全之原子性问题CAS机制及问题解决方案的主要内容,如果未能解决你的问题,请参考以下文章

java并发之CAS详解

并发编程(学习笔记-共享模型之无锁)-part5

Java——多线程高并发系列之理解CAS原子变量类的使用

Java——多线程高并发系列之理解CAS原子变量类的使用

多线程 CAS 机制解析及应用( 原子类 . 自旋锁 )解决 ABA 问题

多线程之CAS