Java高性能并发计数器之巅峰对决
Posted 黑马程序员官方
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java高性能并发计数器之巅峰对决相关的知识,希望对你有一定的参考价值。
java高性能并发计数器之巅峰对决
并发计数器各个方案介绍
方案概述
- jdk5提供的原子更新长整型类 AtomicLong
- synchronized
- jdk8提供的 LongAdder 【单机推荐】
- Redisson分布式累加器【分布式推荐】
方案介绍
jdk5提供的原子更新长整型类 AtomicLong
在JDK1.5开始就新增了并发的Integer/Long的操作工具类AtomicInteger和AtomicLong。
AtomicLong 利用底层操作系统的CAS来保证原子性,在一个死循环内不断执行CAS操作,直到操作成功。不过,CAS操作的一个问题是在并发量比较大的时候,可能很多次的执行CAS操作都不成功,这样性能就受到较大影响。
示例代码
AtomicLong value = new AtomicLong(0); //定义
incrementAndGet(); //递增1
synchronized
synchronized是一个重量级锁,主要是因为线程竞争锁会引起操作系统用户态和内核态切换,浪费资源效率不高,在jdk1.5之前,synchronized没有做任何优化,但在jdk1.6做了性能优化,它会经历偏向锁,轻量级锁,最后才到重量级锁这个过程,在性能方面有了很大的提升,在jdk1.7的ConcurrentHashMap是基于ReentrantLock的实现了锁,但在jdk1.8之后又替换成了synchronized,就从这一点可以看出JVM团队对synchronized的性能还是挺有信心的。下面我们分别来介绍下无锁,偏向锁,轻量级锁,重量级锁。
jdk8提供的 LongAdder 【单机推荐】
在JDK8中又新增了LongAdder,这是一个针对Long类型的数据的操作工具类。
那我们知道,在ConcurrentHashMap中,对Map分割成多个segment,这样多个Segment的操作就可以并行执行,从而可以提高性能。在JDK8中,LongAdder与ConcurrentHashMap类似,将内部操作数据value分离成一个Cell数组,每个线程访问时,通过Hash等算法映射到其中一个Cell上。
计算最终的数据结果,则是各个Cell数组的累计求和。
LongAddr常用api方法
add(): //增加指定的数值;
increament(): //增加1;
decrement(): //减少1;
intValue(); //intValue();/floatValue()/doubleValue():得到最终计数后的结果
sum()://求和,得到最终计数结果
sumThenReset()://求和得到最终计数结果,并重置value。
Redisson分布式累加器【分布式推荐】
基于Redis的Redisson分布式整长型累加器(LongAdder)采用了与java.util.concurrent.atomic.LongAdder类似的接口。通过利用客户端内置的LongAdder对象,为分布式环境下递增和递减操作提供了很高得性能。据统计其性能最高比分布式AtomicLong对象快 10000 倍以上。
RLongAddr itheimaLongAddr = redission.getLongAddr("itheimaLongAddr");
itheimaLongAddr.add(100); //添加指定数量
itheimaLongAddr.increment(); //递增1
itheimaLongAddr.increment(); //递减1
itheimaLongAddr.sum(); //聚合求和
基于Redis的Redisson分布式双精度浮点累加器(DoubleAdder)采用了与java.util.concurrent.atomic.DoubleAdder类似的接口。通过利用客户端内置的DoubleAdder对象,为分布式环境下递增和递减操作提供了很高得性能。据统计其性能最高比分布式AtomicDouble对象快 12000 倍。
示例代码
RLongDouble itheimaDouble = redission.getLongDouble("itheimaLongDouble");
itheimaDouble.add(100); //添加指定数量
itheimaDouble.increment(); //递增1
itheimaDouble.increment(); //递减1
itheimaDouble.sum(); //聚合求和
以上【整长型累加器】和【双精度浮点累加器】完美适用于分布式统计计量场景。
各个方案性能测试
测试代码
package com.itheima._01性能比较;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
/**
* @author 黑马程序员
*/
public class CountTest
private int count = 0;
@Test
public void startCompare()
compareDetail(1, 100 * 10000);
compareDetail(20, 100 * 10000);
compareDetail(30, 100 * 10000);
compareDetail(40, 100 * 10000);
compareDetail(100, 100 * 10000);
/**
* @param threadCount 线程数
* @param times 每个线程增加的次数
*/
public void compareDetail(int threadCount, int times)
try
System.out.println(String.format("threadCount: %s, times: %s", threadCount, times));
long start = System.currentTimeMillis();
testSynchronized(threadCount, times);
System.out.println("testSynchronized cost: " + (System.currentTimeMillis() - start));
start = System.currentTimeMillis();
testAtomicLong(threadCount, times);
System.out.println("testAtomicLong cost: " + (System.currentTimeMillis() - start));
start = System.currentTimeMillis();
testLongAdder(threadCount, times);
System.out.println("testLongAdder cost: " + (System.currentTimeMillis() - start));
System.out.println();
catch (Exception e)
e.printStackTrace();
public void testSynchronized(int threadCount, int times) throws InterruptedException
List<Thread> threadList = new ArrayList<>();
for (int i = 0; i < threadCount; i++)
threadList.add(new Thread(()->
for (int j = 0; j < times; j++)
add();
));
for (Thread thread : threadList)
thread.start();
for (Thread thread : threadList)
thread.join();
public synchronized void add()
count++;
public void testAtomicLong(int threadCount, int times) throws InterruptedException
AtomicLong count = new AtomicLong();
List<Thread> threadList = new ArrayList<>();
for (int i = 0; i < threadCount; i++)
threadList.add(new Thread(()->
for (int j = 0; j < times; j++)
count.incrementAndGet();
));
for (Thread thread : threadList)
thread.start();
for (Thread thread : threadList)
thread.join();
public void testLongAdder(int threadCount, int times) throws InterruptedException
LongAdder count = new LongAdder();
List<Thread> threadList = new ArrayList<>();
for (int i = 0; i < threadCount; i++)
threadList.add(new Thread(()->
for (int j = 0; j < times; j++)
count.increment();
));
for (Thread thread : threadList)
thread.start();
for (Thread thread : threadList)
thread.join();
运行结果
threadCount: 1, times: 1000000
testSynchronized cost: 69
testAtomicLong cost: 16
testLongAdder cost: 15
threadCount: 20, times: 1000000
testSynchronized cost: 639
testAtomicLong cost: 457
testLongAdder cost: 59
threadCount: 30, times: 1000000
testSynchronized cost: 273
testAtomicLong cost: 538
testLongAdder cost: 70
threadCount: 40, times: 1000000
testSynchronized cost: 312
testAtomicLong cost: 717
testLongAdder cost: 81
threadCount: 100, times: 1000000
testSynchronized cost: 719
testAtomicLong cost: 2098
testLongAdder cost: 225
结论
并发量比较低的时候AtomicLong优势比较明显,因为AtomicLong底层是一个乐观锁,不用阻塞线程,不断cas即可。但是在并发比较高的时候用synchronized比较有优势,因为大量线程不断cas,会导致cpu持续飙高,反而会降低效率
LongAdder无论并发量高低,优势都比较明显。且并发量越高,优势越明显
原理分析
AtomicLong 实现原子操作原理
非原子操作示例代码
package com.itheima._02Unsafe测试;
import java.util.ArrayList;
import java.util.List;
/**
* @author 黑马程序员
*/
public class Test1
private int value = 0;
public static void main(String[] args) throws InterruptedException
Test1 test1 = new Test1();
test1.increment();
System.out.println("期待值:" + 100 * 100 + ",最终结果值:" + test1.value);
//结果,期待值:10000,最终结果值:xxxx
private void increment() throws InterruptedException
List<Thread> list = new ArrayList<>();
//启动100个线程,每个线程对value进行累加100次
for (int i = 0; i < 100; i++)
Thread t = new Thread(() ->
for (int j = 0; j < 100; j++)
value++;
);
list.add(t);
t.start();
//保证所有线程运行完成
for (Thread thread : list)
thread.join();
运行效果
结论
可以发现输出的结果值错误,这是因为
value++
不是一个原子操作,它将value++
拆分成了 3 个步骤load、add、store
,多线程并发有可能上一个线程 add 过后还没有 store 下一个线程又执行了 load 了这种重复造成得到的结果可能比最终值要小。
AtomicLong是JDK1.5提供的原子操作示例代码
package com.itheima._03AtomicLong的CAS原子操作示例;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
/**
* @author 黑马程序员
*/
public class Test2
private AtomicLong value = new AtomicLong(0);
public static void main(String[] args) throws InterruptedException
Test2 test1 = new Test2();
test1.increment();
System.out.println("期待值:" + 100 * 100 + ",最终结果值:" + test1.value);
//结果,期待值:10000,最终结果值:10000
private void increment() throws InterruptedException
List<Thread> list = new ArrayList<>();
//启动100个线程,每个线程对value进行累加100次
for (int i = 0; i < 100; i++)
Thread t = new Thread(() ->
for (int j = 0; j < 100; j++)
value.incrementAndGet();
);
list.add(t);
t.start();
//保证所有线程运行完成
for (Thread thread : list)
thread.join();
运行效果
AtomicLong CAS原理介绍
1.使用volatile保证内存可见性,获取主存中最新的操作数据
2.使用CAS(Compare-And-Swap)操作保证数据原子性
CAS算法是jdk对并发操作共享数据的支持,包含了3个操作数
第一个操作数:内存值value(V)
第二个操作数:预估值expect(O)
第三个操作数:更新值new(N)
含义:CAS比较交换的过程可以通俗的理解为CAS(V,O,N),包含三个值分别为:V 内存地址(主存)存放的实际值;O 预期的值(旧值);N 更新的新值。当V和O相同时,也就是说旧值和内存中实际的值相同表明该值没有被其他线程更改过,即该旧值O就是目前来说最新的值了,自然而然可以将新值N赋值给V;当V和O不相同时,会一致循环下去直至修改成功。
AtomicLong底层CAS实现原子操作原理
查看incrementAndGet()方法源码
public final long incrementAndGet()
return unsafe.getAndAddLong(this, valueOffset, 1L) + 1L;
getAndAddLong方法源码
public final long getAndAddLong(Object var1, long var2, long var4)
long var6;
do
var6 = this.getLongVolatile(var1, var2);
while(!this.compareAndSwapLong(var1, var2, var6, var6 + var4));
return var6;
这里是一个循环CAS操作
compareAndSwapLong方法源码
public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var6);
我们发现调用的是 native 的 unsafe.compareAndSwapLong(Object obj, long valueOffset, Long expect, Long update)
,我们翻看 Hotspot 源码发现在 unsafe.cpp 中定义了这样一段代码
Unsafe中基本都是调用native方法,那么就需要去JVM里面找对应的实现。
到
http://hg.openjdk.java.net/
进行一步步选择下载对应的hotspot版本,我这里下载的是http://hg.openjdk.java.net/jdk8u/jdk8u60/hotspot/archive/tip.tar.gz
,然后解hotspot目录,发现
\\src\\share\\vm\\prims\\unsafe.cpp
,这个就是对应jvm相关的c++实现类了。比如我们对CAS部分的实现很感兴趣,就可以在该文件中搜索compareAndSwapInt,此时可以看到对应的JNI方法为
Unsafe_CompareAndSwapInt
UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapLong(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jlong e, jlong x))
UnsafeWrapper("Unsafe_CompareAndSwapLong");
Handle p (THREAD, JNIHandles::resolve(obj));
jlong* addr = (jlong*)(index_oop_from_field_offset_long(p(), offset));
#ifdef SUPPORTS_NATIVE_CX8
return (jlong)(Atomic::cmpxchg(x, addr, e)) == e;
#else
if (VM_Version::supports_cx8())
return (jlong)(Atomic::cmpxchg(x, addr, e)) == e;
else
jboolean success = false;
MutexLockerEx mu(UnsafeJlong_lock, Mutex::_no_safepoint_check_flag);
jlong val = Atomic::load(addr);
if (val == e) Atomic::store(x, addr); success = true;
return success;
#endif
UNSAFE_END
Atomic::cmpxchg c++源码
可以看到调用了“Atomic::cmpxchg”方法,“Atomic::cmpxchg”方法在linux_x86和windows_x86的实现如下。
linux_x86的实现:
inline jint Atomic::cmpxchg (jint exchange_value, volatile jint* dest, jint compare_value)
int mp = os::is_MP();
__asm__ volatile (LOCK_IF_MP(%4) "cmpxchgl %1,(%3)"
: "=a" (exchange_value)
: "r" (exchange_value), "a" (compare_value), "r" (dest), "r" (mp)
: "cc", "memory");
return exchange_value;
windows_x86的实现(c++源文件):
inline jint Atomic::cmpxchg (jint exchange_value, volatile jint* dest, jint compare_value)
// alternative for InterlockedCompareExchange
int mp = os::is_MP();
__asm
mov edx, dest
mov ecx, exchange_value
mov eax, compare_value
LOCK_IF_MP(mp)
cmpxchg dword ptr [edx], ecx
Atomic::cmpxchg方法解析:
mp是“os::is_MP()”的返回结果,“os::is_MP()”是一个内联函数,用来判断当前系统是否为多处理器。
如果当前系统是多处理器,该函数返回1。
否则,返回0。
LOCK_IF_MP(mp)会根据mp的值来决定是否为cmpxchg指令添加lock前缀。
如果通过mp判断当前系统是多处理器(即mp值为1),则为cmpxchg指令添加lock前缀。
否则,不加lock前缀。
这是一种优化手段,认为单处理器的环境没有必要添加lock前缀,只有在多核情况下才会添加lock前缀,因为lock会导致性能下降。cmpxchg是汇编指令,作用是比较并交换操作数。
底层会调用cmpxchg汇编指令,如果是多核处理器会加锁实现原子操作
反汇编指令查询
查看java程序运行的汇编指令资料
将上图2个文件拷贝到jre\\bin目录下,如下图
配置运行参数
-server -Xcomp -XX:+UnlockDiagnosticVMOptions -XX:+PrintAssembly -XX:CompileCommand=compileonly,*
运行Test2效果
synchronized 实现同步操作原理
锁对象
java中任何一个对象都可以称为锁对象,原因在于java对象在内存中存储结构,如下图所示:
在对象头中主要存储的主要是一些运行时的数据,如下所示:
其中 在Mark Work中存储着该对象作为锁时的一些信息,如下所示是Mark Work中在64位系统中详细信息:
偏向锁
在无竞争环境中(没有并发)使用一种锁
偏向锁的作用是当有线程访问同步代码或方法时,线程只需要判断对象头的Mark Word中判断一下是否有偏向锁指向线程ID.
偏向锁记录过程
- 线程抢到了对象的同步锁(锁标志为01参考上图即无其他线程占用)
- 对象Mark World 将是否偏向标志位设置为1
- 记录抢到锁的线程ID
- 进入偏向状态
轻量级锁
当有另外一个线程竞争获取这个锁时,由于该锁已经是偏向锁,当发现对象头 Mark Word 中的线程 ID 不是自己的线程 ID,就会进行 CAS 操作获取锁,如果获取成功,直接替换 Mark Word 中的线程 ID 为自己的 ID,该锁会保持偏向锁状态;如果获取锁失败,代表当前锁有一定的竞争,偏向锁将升级为轻量级锁。
- 举个例子来说明一下什么时候需要升级偏向锁
假设A线程 持有锁 X(此时X是偏向锁) 这是有个B线程也同样用到了锁X,而B线程在检查锁对象的Mark World时发现偏向锁的线程ID已经指向了线程A。这时候就需要升级锁X为轻量级锁。轻量级锁意味着标示该资源现在处于竞争状态。
当有其他线程想访问加了轻量级锁的资源时,会使用自旋锁优化,来进行资源访问。
自旋策略
JVM 提供了一种自旋锁,可以通过自旋方式不断尝试获取锁,从而避免线程被挂起阻塞。这是基于大多数情况下,线程持有锁的时间都不会太长,毕竟线程被挂起阻塞可能会得不偿失。
从 JDK1.7 开始,自旋锁默认启用,自旋次数由 JVM 设置决定,这里我不建议设置的重试次数过多,因为 CAS 重试操作意味着长时间地占用 CPU。自旋锁重试之后如果抢锁依然失败,同步锁就会升级至重量级锁,锁标志位改为 10。在这个状态下,未抢到锁的线程都会进入 Monitor,之后会被阻塞在 _WaitSet 队列中。
重量级锁
自旋失败,很大概率 再一次自选也是失败,因此直接升级成重量级锁,进行线程阻塞,减少cpu消耗。
当锁升级为重量级锁后,未抢到锁的线程都会被阻塞,进入阻塞队列。
重量级锁在高并发下性能就会变慢,因为所有没有获取锁的线程会进行阻塞等待,到获取锁的时候被唤醒,这些操作都是消耗很多资源。
轻量级锁膨胀流程图
LongAdder 实现原子操作原理
LongAdder实现高并发计数实现思路
LongAdder实现高并发的秘密就是用空间换时间,对一个值的cas操作,变成对多个值的cas操作,当获取数量的时候,对这多个值加和即可。
测试代码
package com.itheima._04LongAddr使用测试;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAccumulator;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.LongBinaryOperator;
/**
* @author 黑马程序员
*/
public class Test3
private LongAdder value = new LongAdder(); //默认初始值0
public static void main(String[] args) throws InterruptedException
Test3 test1 = new Test3();
test1.increment();
System.out.println("期待值:" + 100 * 100 + ",最终结果值:" + test1.value.sum());
//结果,期待值:10000,最终结果值:10000
private void increment() throws InterruptedException
List<Thread> list = new ArrayList<>();
//启动100个线程,每个线程对value进行累加100次
for (int i = 0; i < 100; i++)
Thread t = new Thread(() ->
for (int j = 0; j < 100; j++)
value.increment();
);
list.add(t);
t.start();
//保证所有线程运行完成
for (Thread thread : list)
thread.join();
源码分析
- 先对base变量进行cas操作,cas成功后返回
- 对线程获取一个hash值(调用getProbe),hash值对数组长度取模,定位到cell数组中的元素,对数组中的元素进行cas
增加数量源码
public void increment()
add(1L);
public void add(long x)
Cell[] as; long b, v; int m; Cell a;
if ((as = cells) != null || !casBase(b = base, b + x))
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
longAccumulate(x, null, uncontended);
当数组不为空,并且根据线程hash值定位到数组某个下标中的元素不为空,对这个元素cas成功则直接返回,否则进入longAccumulate方法
- cell数组已经初始化完成,主要是在cell数组中放元素,对cell数组进行扩容等操作
- cell数组没有初始化,则对数组进行初始化
- cell数组正在初始化,这时其他线程利用cas对baseCount进行累加操作
完整代码
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended)
int h;
if ((h = getProbe()) == 0)
ThreadLocalRandom.current(); // force initialization
h = getProbe();
wasUncontended = true;
boolean collide = false; // True if last slot nonempty
for (;;)
Cell[] as; Cell a; int n; long v;
if ((as = cells) != null && (n = as.length) > 0)
if ((a = as[(n - 1) & h]) == null)
if (cellsBusy == 0) // Try to attach new Cell
Cell r = new Cell(x); // Optimistically create
if (cellsBusy == 0 && casCellsBusy())
boolean created = false;
try // Recheck under lock
Cell[] rs; int m, j;
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null)
rs[j] = r;
created = true;
finally
cellsBusy = 0;
if (created)
break;
continue; // Slot is now non-empty
collide = false;
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v大天使之剑h5巅峰对决怎么玩
应用性能管理の巅峰对决:Apache Skywalking P.K. Pinpoint
巅峰对决!Amazon DeepRacer中国联赛总决赛来了!