[Java]_[初级]_[并发下使用AtomicReference来保证原子读写对象]
Posted infoworld
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了[Java]_[初级]_[并发下使用AtomicReference来保证原子读写对象]相关的知识,希望对你有一定的参考价值。
场景
-
在开发
Java
多线程程序时,和C/C++
一样会遇到多线程同时修改共享对象的问题。比如一个缓存HashMap
需要更新,那么可以使用标准库的ConcurrentHashMap
来替换HashMap
作为共享对象。如果是自定义的对象呢,并发访问可能会造成访问错误。 -
解决并发读写的方案之一是使用
synchronized
,但它只能作为最后的方案,因为加锁解锁是会耗费很多CPU
时间的。而且synchronized
多的时候容易造成死锁。
说明
-
在
C/C++
里可以使用atomic_load
和atomic_store
函数以原子方式读写shared_ptr
对象,在文章C/C++多线程访问修改集合vector会冲突的两个解决方案
[1]里介绍过. 在Java
里也有同样用途的类AtomicReference<E>
,它可以通过方法get(),set()
原子读写变量。 -
利用
AtomicReference<E>
对包裹对象的原子读写,满足多线程的Happen-Before
规则,可以用新的对象原子替换旧的对象。比如一个新的HashMap
对象set()
替换旧的对象后,只要通过get()
获取到对象都是新对象,之前的旧可以继续使用,直到它被gc
。 -
内存的原子操作可以理解为某个时刻某个内存地址只允许一个指令操作,即使在多核下。
例子
-
这里例子原本是使用
HashMap
来作为缓存的,并且内部元素不多的情况下,可以使用AtomicReference
直接替换旧的HashMap
。这里还有适用场景就是共享变量不是严格执行值改变之后即时生效的情况(强一致性[3]),比如一些URL
缓存需要一段时间生效,也是通常说的Copy-On=Write
原则。 -
对于共享变量修改之后需要即时生效,只能加锁进行控制。
ConcurrentHashMap
用的也是原子的方式操作元素,单个方法能做到强一致性
,组合方法也只是能做到最终一致性
。 -
以下执行
testAtomicReference
方法可以验证多线程下的原子替换HashMap
。但是如果执行testMapNoAtomicReference
方法的话,由于是多线程读写同一个HashMap
, 会出现崩溃。
Caused by: java.util.ConcurrentModificationException
at java.base/java.util.HashMap$HashIterator.nextNode(HashMap.java:1493)
AtomicReferenceTest
package com.example.string;
import static java.lang.Thread.sleep;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.jupiter.api.Test;
public class AtomicReferenceTest {
private BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
private ThreadPoolExecutor executor = new ThreadPoolExecutor(4, 6, 3600, TimeUnit.SECONDS, queue);
public static void print(String str){
System.out.println(str);
}
private static Map<String,String> createMap(int _id){
Map<String,String> cache = new HashMap<>();
for(int i = 0; i< 10; i++){
String key = i+"";
cache.put(key,key+"-value"+_id);
}
return cache;
}
private static class Task implements Runnable{
protected boolean stopped;
protected AtomicReference<Map<String,String>> ar;
protected String name;
public Task(AtomicReference<Map<String,String>> ar) {
this.ar = ar;
}
public void setStopped(boolean stopped){
this.stopped = stopped;
}
@Override
public void run(){
Map<String, String> cache = ar.get();
print("Thread BEGIN ->"+Thread.currentThread().getId()+": map object->"+System.identityHashCode(cache));
while(!stopped){
cache = ar.get();
for (Map.Entry<String,String> entry : cache.entrySet()) {
name = entry.getKey()+":"+entry.getValue();
}
}
cache = ar.get();
for (Map.Entry<String,String> entry : cache.entrySet()) {
print(entry.getKey()+":"+entry.getValue());
}
print("Thread END->"+Thread.currentThread().getId()+": map object->"+System.identityHashCode(cache));
}
}
private static class TaskNoAtomic implements Runnable{
protected boolean stopped;
protected Map<String,String> cache;
protected String name;
public TaskNoAtomic(Map<String,String> cache) {
this.cache = cache;
}
public void setStopped(boolean stopped){
this.stopped = stopped;
}
@Override
public void run(){
print("Thread BEGIN ->"+Thread.currentThread().getId()+": map object->"+System.identityHashCode(cache));
while(!stopped){
for (Map.Entry<String,String> entry : cache.entrySet()) {
name = entry.getKey()+":"+entry.getValue();
}
}
for (Map.Entry<String,String> entry : cache.entrySet()) {
print(entry.getKey()+":"+entry.getValue());
}
print("Thread END->"+Thread.currentThread().getId()+": map object->"+System.identityHashCode(cache));
}
}
@Test
public void testAtomicReference(){
print("BEGIN");
Map<String, String> cache = createMap(0);
AtomicReference<Map<String,String>> one = new AtomicReference<>(cache);
Task task1 = new Task(one);
Task task2 = new Task(one);
CompletableFuture<Void> cf1 = CompletableFuture.runAsync(task1,executor);
CompletableFuture<Void> cf2 = CompletableFuture.runAsync(task2,executor);
Task task3 = new Task(one){
@Override
public void run(){
int i = 1;
while(!stopped){
Map<String, String> temp = createMap(++i);
temp.put("key", "value");
ar.set(temp);
}
}
};
CompletableFuture<Void> cf3 = CompletableFuture.runAsync(task3,executor);
try {
sleep(10000);
task1.setStopped(true);
task2.setStopped(true);
task3.setStopped(true);
cf1.join();
cf2.join();
cf3.join();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
print("END");
}
@Test
public void testMapNoAtomicReference(){
print("BEGIN");
Map<String, String> cache = createMap(0);
TaskNoAtomic task1 = new TaskNoAtomic(cache);
TaskNoAtomic task2 = new TaskNoAtomic(cache);
CompletableFuture<Void> cf1 = CompletableFuture.runAsync(task1,executor);
CompletableFuture<Void> cf2 = CompletableFuture.runAsync(task2,executor);
TaskNoAtomic task3 = new TaskNoAtomic(cache){
@Override
public void run(){
int i = 1;
while(!stopped){
this.cache.put("9", "value");
this.cache.remove("9");
}
}
};
CompletableFuture<Void> cf3 = CompletableFuture.runAsync(task3,executor);
try {
sleep(10000);
task1.setStopped(true);
task2.setStopped(true);
task3.setStopped(true);
cf1.join();
cf2.join();
cf3.join();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
print("END");
}
}
输出
BEGIN
Thread BEGIN ->15: map object->928671469
Thread BEGIN ->14: map object->928671469
0:0-value3410355
1:1-value3410355
2:2-value3410355
3:3-value3410355
4:4-value3410355
5:5-value3410355
6:6-value3410355
7:7-value3410355
8:8-value3410355
9:9-value3410355
key:value
Thread END->15: map object->1835239739
0:0-value3410355
1:1-value3410355
2:2-value3410355
3:3-value3410355
4:4-value3410355
5:5-value3410355
6:6-value3410355
7:7-value3410355
8:8-value3410355
9:9-value3410355
key:value
Thread END->14: map object->1835239739
END
参考
以上是关于[Java]_[初级]_[并发下使用AtomicReference来保证原子读写对象]的主要内容,如果未能解决你的问题,请参考以下文章
[Java]_[初级]_[如何调用外部命令获取输出并设置它的超时退出]