Posted xiangkejin
Java 5新增了AtomicInteger类,该类包含方法getAndIncrement()以及getAndDecrement(),这两个方法实现了原子加以及原子减操作,但是比较不同的是这两个操作没有使用任何加锁机制,属于无锁操作。
在JDK 5之前Java语言是靠synchronized关键字保证同步的,这会导致有锁(后面的章节还会谈到锁)。
CAS 操作
上面的乐观锁用到的机制就是CAS,Compare and Swap。

package com.xidian.example; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicInteger; import com.xidian.annotations.ThreadSafe; import lombok.extern.slf4j.Slf4j; @Slf4j @ThreadSafe public class AtomicExample1 { // 请求总数 public static int clientTotal = 5000; // 同时并发执行的线程数 public static int threadTotal = 200; public static AtomicInteger count = new AtomicInteger(0); public static void main(String[] args) throws Exception { ExecutorService executorService = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(threadTotal); final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); for (int i = 0; i < clientTotal ; i++) { executorService.execute(() -> { try { semaphore.acquire(); add(); semaphore.release(); } catch (Exception e) { log.error("exception", e); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); log.info("count:{}", count.get()); } private static void add() { count.incrementAndGet(); // count.getAndIncrement(); } }

package com.xidian.example; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicLong; import com.xidian.annotations.ThreadSafe; import lombok.extern.slf4j.Slf4j; @Slf4j @ThreadSafe public class AtomicExample2 { // 请求总数 public static int clientTotal = 5000; // 同时并发执行的线程数 public static int threadTotal = 200; public static AtomicLong count = new AtomicLong(0); public static void main(String[] args) throws Exception { ExecutorService executorService = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(threadTotal); final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); for (int i = 0; i < clientTotal ; i++) { executorService.execute(() -> { try { semaphore.acquire(); add(); semaphore.release(); } catch (Exception e) { log.error("exception", e); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); log.info("count:{}", count.get()); } private static void add() { count.incrementAndGet(); // count.getAndIncrement(); } }

package com.xidian.example; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.LongAdder; import com.xidian.annotations.ThreadSafe; import lombok.extern.slf4j.Slf4j; @Slf4j @ThreadSafe public class AtomicExample3 { // 请求总数 public static int clientTotal = 5000; // 同时并发执行的线程数 public static int threadTotal = 200; public static LongAdder count = new LongAdder(); public static void main(String[] args) throws Exception { ExecutorService executorService = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(threadTotal); final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); for (int i = 0; i < clientTotal ; i++) { executorService.execute(() -> { try { semaphore.acquire(); add(); semaphore.release(); } catch (Exception e) { log.error("exception", e); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); log.info("count:{}", count); } private static void add() { count.increment(); } }
LongAdder 在低并发的时候通过直接操作base,可以很好的保证和Atomic的性能基本一致,在高并发的场景,通过热点分区来提高并行度。

package com.xidian.example; import java.util.concurrent.atomic.AtomicReference; import com.xidian.annotations.ThreadSafe; import lombok.extern.slf4j.Slf4j; @Slf4j @ThreadSafe public class AtomicExample4 { private static AtomicReference<Integer> count = new AtomicReference<>(0); public static void main(String[] args) { count.compareAndSet(0, 2); // 2 count.compareAndSet(0, 1); // no count.compareAndSet(1, 3); // no count.compareAndSet(2, 4); // 4 count.compareAndSet(3, 5); // no log.info("count:{}", count.get()); } }
* AtomicIntegerFieldUpdater 核心是原子性的去更新某一个类的实例的指定的某一个字段
* 构造函数第一个参数为类定义,第二个参数为指定字段的属性名,必须是volatile修饰并且非static的字段

package com.xidian.example; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import com.xidian.annotations.ThreadSafe; import lombok.Getter; import lombok.extern.slf4j.Slf4j; @Slf4j @ThreadSafe public class AtomicExample5 { //泛型表示要更新的对象AtomicExample5,更新AtomicExample5.class类中的count字段 private static AtomicIntegerFieldUpdater<AtomicExample5> updater = AtomicIntegerFieldUpdater.newUpdater(AtomicExample5.class, "count"); //这个count必须要通过volatile来修饰 @Getter public volatile int count = 100; public static void main(String[] args) { AtomicExample5 example5 = new AtomicExample5(); if (updater.compareAndSet(example5, 100, 120)) { log.info("update success 1, {}", example5.getCount()); } if (updater.compareAndSet(example5, 100, 120)) { log.info("update success 2, {}", example5.getCount()); } else { log.info("update failed, {}", example5.getCount()); } } }

/** * Atomically sets the value of both the reference and stamp * to the given update values if the * current reference is {@code ==} to the expected reference * and the current stamp is equal to the expected stamp. * * @param expectedReference the expected value of the reference * @param newReference the new value for the reference * @param expectedStamp the expected value of the stamp(上面提到的版本号) * @param newStamp the new value for the stamp * @return {@code true} if successful */ public boolean compareAndSet(V expectedReference, V newReference, int expectedStamp, int newStamp) { Pair<V> current = pair; return expectedReference == current.reference && expectedStamp == current.stamp && ((newReference == current.reference && newStamp == current.stamp) || casPair(current, Pair.of(newReference, newStamp))); }

/** * Atomically sets the element at position {@code i} to the given value * and returns the old value. * * @param i the index * @param newValue the new value * @return the previous value */ public final long getAndSet(int i, long newValue) { return unsafe.getAndSetLong(array, checkedByteOffset(i), newValue); } ... ... /** * Atomically sets the element at position {@code i} to the given * updated value if the current value {@code ==} the expected value. * * @param i the index * @param expect the expected value * @param update the new value * @return {@code true} if successful. False return indicates that * the actual value was not equal to the expected value. */ public final boolean compareAndSet(int i, long expect, long update) { return compareAndSetRaw(checkedByteOffset(i), expect, update); }


@Slf4j public class SyncronizedExample1 { /** * 修饰一个代码块,作用范围为大括号括起来的 */ public void test1(){ synchronized (this){ for (int i = 0; i < 10; i++) { log.info("test1-{}",i); } } } /** * 修改方法,作用范围是整个方法,作用对象为调用这个方法的对象 * 若子类继承父类调用父类的synchronized方法,是带不上synchronized关键字的 * 原因:synchronized 不属于方法声明的一部分 * 如果子类也想使用同步需要在方法上声明 */ public synchronized void test2(){ for (int i = 0; i < 10; i++) { log.info("test2-{}",i); } } public static void main(String[] args) { SyncronizedExample1 example1 = new SyncronizedExample1(); SyncronizedExample1 example2 = new SyncronizedExample1(); // 使用线程池模拟一个对象的两个进程同时调用一段sync代码的执行过程 ExecutorService executorService = Executors.newCachedThreadPool(); // 线程pool-1-thread-1,pool-1-thread-2 交叉输出 executorService.execute(()-> example1.test1()); executorService.execute(()-> example2.test1()); // 线程pool-1-thread-1 先从0-9输出,然后pool-1-thread-2 从0到9顺序输出 // executorService.execute(()-> example1.test1()); // executorService.execute(()-> example1.test1()); } }

@Slf4j public class SyncronizedExample2 { /** * 修饰类,括号包起来的代码 * 作用对象为这个类的所有对象 */ public static void test1(){ synchronized (SyncronizedExample2.class){ for (int i = 0; i < 10; i++) { log.info("test1-{}",i); } } } /** * 修饰一个静态方法,作用对象为这个类的所有对象 */ public static synchronized void test2(){ for (int i = 0; i < 10; i++) { log.info("test2-{}",i); } } public static void main(String[] args) { SyncronizedExample2 example1 = new SyncronizedExample2(); SyncronizedExample2 example2 = new SyncronizedExample2(); // 使用线程池模拟一个对象的两个进程同时调用一段sync代码的执行过程 ExecutorService executorService = Executors.newCachedThreadPool(); // 线程pool-1-thread-1 先从0-9输出,然后pool-1-thread-2 从0到9顺序输出 executorService.execute(()-> example1.test1()); executorService.execute(()-> example1.test1()); // 线程pool-1-thread-1 先从0-9输出,然后pool-1-thread-2 从0到9顺序输出 // executorService.execute(()-> example1.test2()); // executorService.execute(()-> example2.test2()); } }