[Java]_[初级]_[并发下使用AtomicReference来保证原子读写对象]

Posted infoworld

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了[Java]_[初级]_[并发下使用AtomicReference来保证原子读写对象]相关的知识,希望对你有一定的参考价值。

场景

  1. 在开发Java多线程程序时,和C/C++一样会遇到多线程同时修改共享对象的问题。比如一个缓存HashMap需要更新,那么可以使用标准库的ConcurrentHashMap来替换HashMap作为共享对象。如果是自定义的对象呢,并发访问可能会造成访问错误。

  2. 解决并发读写的方案之一是使用synchronized,但它只能作为最后的方案,因为加锁解锁是会耗费很多CPU时间的。而且synchronized多的时候容易造成死锁。

说明

  1. C/C++里可以使用atomic_loadatomic_store函数以原子方式读写shared_ptr对象,在文章C/C++多线程访问修改集合vector会冲突的两个解决方案[1]里介绍过. 在Java里也有同样用途的类AtomicReference<E>,它可以通过方法get(),set()原子读写变量。

  2. 利用AtomicReference<E>对包裹对象的原子读写,满足多线程的Happen-Before规则,可以用新的对象原子替换旧的对象。比如一个新的HashMap对象set()替换旧的对象后,只要通过get()获取到对象都是新对象,之前的旧可以继续使用,直到它被gc

  3. 内存的原子操作可以理解为某个时刻某个内存地址只允许一个指令操作,即使在多核下。

例子

  1. 这里例子原本是使用HashMap来作为缓存的,并且内部元素不多的情况下,可以使用AtomicReference直接替换旧的HashMap。这里还有适用场景就是共享变量不是严格执行值改变之后即时生效的情况(强一致性[3]),比如一些URL缓存需要一段时间生效,也是通常说的Copy-On=Write原则。

  2. 对于共享变量修改之后需要即时生效,只能加锁进行控制。ConcurrentHashMap用的也是原子的方式操作元素,单个方法能做到强一致性,组合方法也只是能做到最终一致性

  3. 以下执行testAtomicReference方法可以验证多线程下的原子替换HashMap。但是如果执行testMapNoAtomicReference方法的话,由于是多线程读写同一个HashMap, 会出现崩溃。

Caused by: java.util.ConcurrentModificationException
	at java.base/java.util.HashMap$HashIterator.nextNode(HashMap.java:1493)

AtomicReferenceTest

package com.example.string;

import static java.lang.Thread.sleep;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import org.junit.jupiter.api.Test;


public class AtomicReferenceTest 

    private BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
    private ThreadPoolExecutor executor = new ThreadPoolExecutor(4, 6, 3600, TimeUnit.SECONDS, queue);

    public static void print(String str)
        System.out.println(str);
    
    
    private static Map<String,String> createMap(int _id)
        Map<String,String> cache = new HashMap<>();
        for(int i = 0; i< 10; i++)
            String key = i+"";
            cache.put(key,key+"-value"+_id);
        
        
        return cache;
    

    private static class Task implements Runnable

        protected boolean stopped;
        protected AtomicReference<Map<String,String>> ar;
        protected String name;

        public Task(AtomicReference<Map<String,String>> ar) 
            this.ar = ar;
        

        public void setStopped(boolean stopped)
            this.stopped = stopped;
        

        @Override
        public void run()
            Map<String, String> cache = ar.get();
            print("Thread BEGIN ->"+Thread.currentThread().getId()+": map object->"+System.identityHashCode(cache));
            
            while(!stopped)
                cache = ar.get();
                for (Map.Entry<String,String> entry : cache.entrySet()) 
                    name = entry.getKey()+":"+entry.getValue();
                
            

            cache = ar.get();
            for (Map.Entry<String,String> entry : cache.entrySet()) 
                print(entry.getKey()+":"+entry.getValue());
            
            print("Thread END->"+Thread.currentThread().getId()+": map object->"+System.identityHashCode(cache));
        

    

    private static class TaskNoAtomic implements Runnable

        protected boolean stopped;
        protected Map<String,String> cache;
        protected String name;

        public TaskNoAtomic(Map<String,String> cache) 
            this.cache = cache;
        

        public void setStopped(boolean stopped)
            this.stopped = stopped;
        

        @Override
        public void run()
            print("Thread BEGIN ->"+Thread.currentThread().getId()+": map object->"+System.identityHashCode(cache));
            
            while(!stopped)
                for (Map.Entry<String,String> entry : cache.entrySet()) 
                    name = entry.getKey()+":"+entry.getValue();
                
            

            for (Map.Entry<String,String> entry : cache.entrySet()) 
                print(entry.getKey()+":"+entry.getValue());
            
            print("Thread END->"+Thread.currentThread().getId()+": map object->"+System.identityHashCode(cache));
        

    

    @Test
    public void testAtomicReference()
        
        print("BEGIN");

        Map<String, String> cache = createMap(0);
        AtomicReference<Map<String,String>> one = new AtomicReference<>(cache);

        Task task1 = new Task(one);
        Task task2 = new Task(one);
        CompletableFuture<Void> cf1 = CompletableFuture.runAsync(task1,executor);
        CompletableFuture<Void> cf2 = CompletableFuture.runAsync(task2,executor);

        Task task3 = new Task(one)
            @Override
            public void run()
                int i = 1;
                while(!stopped)
                    Map<String, String> temp = createMap(++i);
                    temp.put("key", "value");
                    ar.set(temp);
                
            
        ;
        CompletableFuture<Void> cf3 = CompletableFuture.runAsync(task3,executor); 

        try 
            sleep(10000);
            
            task1.setStopped(true);
            task2.setStopped(true);
            task3.setStopped(true);    

            cf1.join();
            cf2.join();
            cf3.join();   
            
         catch (InterruptedException e) 
            // TODO Auto-generated catch block
            e.printStackTrace();
        

        print("END");
    

    @Test
    public void testMapNoAtomicReference()
        
        print("BEGIN");

        Map<String, String> cache = createMap(0);

        TaskNoAtomic task1 = new TaskNoAtomic(cache);
        TaskNoAtomic task2 = new TaskNoAtomic(cache);
        CompletableFuture<Void> cf1 = CompletableFuture.runAsync(task1,executor);
        CompletableFuture<Void> cf2 = CompletableFuture.runAsync(task2,executor);

        TaskNoAtomic task3 = new TaskNoAtomic(cache)
            @Override
            public void run()
                int i = 1;
                while(!stopped)
                    this.cache.put("9", "value");
                    this.cache.remove("9");
                
            
        ;
        CompletableFuture<Void> cf3 = CompletableFuture.runAsync(task3,executor); 

        try 
            sleep(10000);
            
            task1.setStopped(true);
            task2.setStopped(true);
            task3.setStopped(true);    

            cf1.join();
            cf2.join();
            cf3.join();   
            
         catch (InterruptedException e) 
            // TODO Auto-generated catch block
            e.printStackTrace();
        

        print("END");
    
    

输出

BEGIN
Thread BEGIN ->15: map object->928671469
Thread BEGIN ->14: map object->928671469
0:0-value3410355
1:1-value3410355
2:2-value3410355
3:3-value3410355
4:4-value3410355
5:5-value3410355
6:6-value3410355
7:7-value3410355
8:8-value3410355
9:9-value3410355
key:value
Thread END->15: map object->1835239739
0:0-value3410355
1:1-value3410355
2:2-value3410355
3:3-value3410355
4:4-value3410355
5:5-value3410355
6:6-value3410355
7:7-value3410355
8:8-value3410355
9:9-value3410355
key:value
Thread END->14: map object->1835239739
END

参考

  1. C/C++多线程访问修改集合vector会冲突的两个解决方案

  2. 从Java多线程可见性谈Happens-Before原则

  3. 强一致性和最终一致性

以上是关于[Java]_[初级]_[并发下使用AtomicReference来保证原子读写对象]的主要内容,如果未能解决你的问题,请参考以下文章

[Java]_[初级]_[如何调用外部命令获取输出并设置它的超时退出]

[Java]_[初级]_[如何调用外部命令获取输出并设置它的超时退出]

[Java]_[初级]_[可变参数的使用技巧]

[Java语言特性]_[初级]_[可变参数的使用技巧]

JUC高级多线程_04:高并发下集合类的具体使用

[Java]_[初级]_[高效使用String.split的方法]