java并发容器(MapListBlockingQueue)具体解释

Posted blfbuaa

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了java并发容器(MapListBlockingQueue)具体解释相关的知识,希望对你有一定的参考价值。

Java库本身就有多种线程安全的容器和同步工具,当中同步容器包含两部分:一个是VectorHashtable。另外还有JDK1.2中增加的同步包装类。这些类都是由Collections.synchronizedXXX工厂方法。

同步容器都是线程安全的,可是对于复合操作。缺有些缺点:

① 迭代:在查觉到容器在迭代開始以后被改动,会抛出一个未检查异常ConcurrentModificationException,为了避免这个异常,须要在迭代期间,持有一个容器锁。可是锁的缺点也非常明显。就是对性能的影响。

② 隐藏迭代器:StringBuildertoString方法会通过迭代容器中的每一个元素,另外容器的hashCodeequals方法也会间接地调用迭代。

类似地,contailAllremoveAllretainAll方法。以及容器作为參数的构造函数,都会对容器进行迭代。

③ 缺少即增加等一些复合操作

 

public static Object getLast(Vector list)    

int lastIndex list.size() 1;    

return list.get(lastIndex);

}

public static void deleteLast(Vector list)    

int lastIndex list.size() 1;    

list.remove(lastIndex);

}

getLastdeleteLast都是复合操作。由先前对原子性的分析能够推断,这依旧存在线程安全问题。有可能会抛出ArrayIndexOutOfBoundsException的异常,错误产生的逻辑例如以下所看到的:

 


解决的方法就是通过对这些复合操作加锁

 

并发容器类

 

正是因为同步容器类有以上问题,导致这些类成了鸡肋,于是Java 5推出了并发容器类,Map相应的有ConcurrentHashMapList相应的有CopyOnWriteArrayList。与同步容器类相比。它有下面特性:

 

1.1 ConcurrentHashMap

 

· 更加细化的锁机制。同步容器直接把容器对象做为锁,这样就把全部操作串行化,事实上这是不是必需的,过于悲观,而并发容器採用更细粒度的锁机制。名叫分离锁。保证一些不会发生并发问题的操作进行并行运行

· 附加了一些原子性的复合操作。比方putIfAbsent方法

· 迭代器的弱一致性。而非“及时失败”。它在迭代过程中不再抛出Concurrentmodificationexception异常,而是弱一致性。

· 在并发高的情况下,有可能sizeisEmpty方法不准确。但真正在并发环境下这些方法也没什么作用。

· 另外。它另一些附加的原子操作。缺少即增加、相等便移除、相等便替换

 

putIfAbsent(K key, V value),缺少即增加(假设该键已经存在,则不增加)
          假设指定键已经不再与某个值相关联。则将它与给定值关联。

类似于以下的操作

If(!map.containsKey(key)){

return map.put(key,value);

}else{

return map.get(key);

}

 

remove(Object key, Object value),相等便移除
          仅仅有眼下将键的条目映射到给定值时,才移除该键的条目。

类似于以下的:

if(map.containsKey(key) && map.get(key).equals(value)){

Map.remove();

return true;

}else{

return false;

}

 

replace(K key, V value)

replace(K key, V oldValue, V newValue),相等便替换。
          仅仅有眼下将键的条目映射到某一值时,才替换该键的条目。

 

上面提到的三个。都是原子的。在一些缓存应用中能够考虑取代HashMap/Hashtable

 

1.2 CopyOnWriteArrayListCopyOnWriteArraySet

· CopyOnWriteArrayList採用写入时复制的方式避开并发问题。这事实上是通过冗余和不可变性来解决并发问题,在性能上会有比較大的代价,但假设写入的操作远远小于迭代和读操作,那么性能就区别不大了。

 

应用它们的场合通常在数组相对较小。而且遍历操作的数量大大超过可变操作的数量时。这样的场合应用它们很好。它们全部可变的操作都是先取得后台数组的副本,对副本进行更改。然后替换副本。这样能够保证永远不会抛出ConcurrentModificationException移除。

队列

Java中的队列接口就是Queue。它有会抛出异常的addremove方法。在队尾插入元素以及对头移除元素。还有不会抛出异常的,相应的offerpoll方法。

 

2.1 LinkedList

List实现了deque接口以及List接口,能够将它看做是这两种的不论什么一种。

 

Queue queue=new LinkedList();

queue.offer("testone");

queue.offer("testtwo");

queue.offer("testthree");

queue.offer("testfour");

 

System.out.println(queue.poll());//testone

2.2 PriorityQueue

一个基于优先级堆(简单的使用链表的话。可能插入的效率会比較低O(N)的无界优先级队列。

优先级队列的元素依照其自然顺序进行排序,或者依据构造队列时提供的 Comparator 进行排序,详细取决于所使用的构造方法。

优先级队列不同意使用 null 元素。依靠自然顺序的优先级队列还不同意插入不可比較的对象。

 

queue=new PriorityQueue();

queue.offer("testone");

queue.offer("testtwo");

queue.offer("testthree");

queue.offer("testfour");

 

System.out.println(queue.poll());//testfour

 

2.3 ConcurrentLinkedQueue

基于链节点的。线程安全的队列。

并发訪问不须要同步。在队列的尾部加入元素,并在头部删除他们。

所以仅仅要不须要知道队列的大小。并发队列就是比較好的选择。

 

 

 

堵塞队列

3.1 生产者和消费者模式

生产者和消费者模式,生产者不须要知道消费者的身份或者数量,甚至根本没有消费者。他们仅仅负责把数据放入队列。

类似地。消费者也不须要知道生产者是谁,以及是谁给他们安排的工作。

 

Java知道大家清楚这个模式的并发复杂性,于是乎提供了堵塞队列(BlockingQueue)来满足这个模式的需求。堵塞队列说起来非常easy,就是当队满的时候写线程会等待,直到队列不满的时候;当队空的时候读线程会等待。直到队不空的时候。实现这样的模式的方法非常多,其差别也就在于谁的消耗更低和等待的策略更优。以LinkedBlockingQueue的详细实现为例,它的put源代码例如以下:

public void put(E e) throws InterruptedException {

        if (e == nullthrow new NullPointerException();

        int -1;

        final ReentrantLock putLock this.putLock;

        final AtomicInteger count this.count;

        putLock.lockInterruptibly();

        try {

            

            try {

                while (count.get() == capacity)

                    notFull.await();

            catch (InterruptedException ie) {

                notFull.signal(); 

// propagate to non-interrupted thread

                throw ie;

            }

            insert(e);

            count.getAndIncrement();

            if (c capacity)

                notFull.signal();

        finally {

            putLock.unlock();

        }

        if (c == 0)

            signalNotEmpty();

}

 

撇开其锁的详细实现,其流程就是我们在操作系统课上学习到的标准生产者模式,看来那些枯燥的理论还是实用武之地的。当中,最核心的还是Java的锁实现,有兴趣的朋友能够再进一步深究一下

 

堵塞队列Blocking queue,提供了可堵塞的puttake方法,他们与可定时的offerpoll方法是等价。Put方法简化了处理。假设是有界队列。那么当队列满的时候,生成者就会堵塞,从而改消费者很多其它的追赶速度。

 

 

 

3.2 ArrayBlockingQueueLinkedBlockingQueue

 

FIFO的队列。与LinkedList(由链节点支持。无界)ArrayList(由数组支持。有界)相似(Linked有更好的插入和移除性能。Array有更好的查找性能,考虑到堵塞队列的特性,移除头部,增加尾部,两个都差别不大)。可是却拥有比同步List更好的并发性能。

 

另外,LinkedList永远不会等待,由于他是无界的。

 

BlockingQueue<String> queue=new ArrayBlockingQueue<String>(5);

 

Producer p=new Producer(queue);

Consumer c1=new Consumer(queue);

Consumer c2=new Consumer(queue);

 

new Thread(p).start();

new Thread(c1).start();

new Thread(c2).start();

 

 

class Producer implements Runnable {

   private final BlockingQueue queue;

   Producer(BlockingQueue q) queue q; }

   

   public void run() {

     try {

     for(int i=0;i<100;i++){

     queue.put(produce());

     }

     

     catch (InterruptedException ex) {}

   }

   

   String produce() {

   String temp=""+(char)(‘A‘+(int)(Math.random()*26));

   System.out.println("produce"+temp);

   return temp;

   }

 }

 

 

class Consumer implements Runnable {

   private final BlockingQueue queue;

   Consumer(BlockingQueue q) queue q; }

   public void run() {

     try {

     for(int i=0;i<100;i++){

     consume(queue.take());

     }

     catch (InterruptedException ex) {}

   }

   void consume(Object x) {

   System.out.println("cousume"+x.toString());

   }

 }

 

输出:

produceK

cousumeK

produceV

cousumeV

produceQ

cousumeQ

produceI

produceD

produceI

produceG

produceA

produceE

cousumeD

3.3 PriorityBlockingQueue

一个按优先级堆支持的无界优先级队列队列,假设不希望依照FIFO的顺序进行处理,它很实用。

它能够比較元素本身的自然顺序。也能够使用一个Comparator排序。

3.4 DelayQueue

一个优先级堆支持的。基于时间的调度队列。加入到队列中的元素必须实现新的Delayed接口(仅仅有一个方法。Long getDelay(java.util.concurrent.TimeUnit unit))。加入能够理马上返回。可是在延迟时间过去之前。不能从队列中取出元素,假设多个元素的延迟时间已到。那么最早失效链接/失效时间最长的元素将第一个取出。

 

static class NanoDelay implements Delayed{

long tigger;

 

NanoDelay(long i){

tigger=System.nanoTime()+i;

}

 

public boolean equals(Object other){

return ((NanoDelay)other).tigger==tigger;

}

 

 

public long getDelay(TimeUnit unit) {

long n=tigger-System.nanoTime();

return unit.convert(n, TimeUnit.NANOSECONDS);

}

 

public long getTriggerTime(){

return tigger;

}

 

 

public int compareTo(Delayed o) {

long i=tigger;

long j=((NanoDelay)o).tigger;

if(i<j){

return -1;

}

if(i>j)

return 1;

return 0;

}

 

}

 

public static void main(String[] args) throws InterruptedException{

Random random=new Random();

DelayQueue<NanoDelay> queue=new DelayQueue<NanoDelay>();

for(int i=0;i<5;i++){

queue.add(new NanoDelay(random.nextInt(1000)));

 

}

long last=0;

for(int i=0;i<5;i++){

NanoDelay delay=(NanoDelay)(queue.take());

long tt=delay.getTriggerTime();

System.out.println("Trigger time:"+tt);

 

if(i!=0){

System.out.println("Data: "+(tt-last));

}

last=tt;

}

 

}

 

 

3.5 SynchronousQueue

不是一个真正的队列,由于它不会为队列元素维护不论什么存储空间,只是它维护一个排队的线程清单。这些线程等待把元素增加(enqueue)队列或者移出(dequeue)队列。

也就是说。它很直接的移交工作,降低了生产者和消费者之间移动数据的延迟时间,另外。也能够更快的知道反馈信息,当移交被接受时,它就知道消费者已经得到了任务。

由于SynChronousQueue没有存储的能力。所以除非还有一个线程已经做好准备,否则puttake会一直阻止。它仅仅有在消费者比較充足的时候比較合适

 

双端队列(Deque

JAVA6中新增了两个容器DequeBlockingDeque,他们分别扩展了QueueBlockingQueueDeque它是一个双端队列,同意高效的在头和尾分别进行插入和删除。它的实现各自是ArrayDequeLinkedBlockingQueue

 

双端队列使得他们可以工作在一种称为“窃取工作”的模式上面。

 

最佳实践

1..同步的(synchronized+HashMap,假设不存在,则计算,然后增加,该方法须要同步。

HashMap cache=new HashMap();

public synchronized compute(A arg){

result=cace.get(arg);

Ii(result==null){

result=c.compute(arg);

Cache.put(result);

}

Return result;

}

 

 

2.用ConcurrentHashMap取代HashMap+同步.。这种在getset的时候也基本能保证原子性。可是会带来反复计算的问题.

Map<A,V> cache=new ConcurrentHashMap<A,V>();

public  compute(A arg){

result=cace.get(arg);

Ii(result==null){

result=c.compute(arg);

Cache.put(result);

}

Return result;

}

3.採用FutureTask取代直接存储值,这样能够在一開始创建的时候就将Task增加

Map<A,FutureTask<V>> cache=new ConcurrentHashMap<A,FutureTask<V>>();

public  compute(A arg){

FutureTask <T> f=cace.get(arg);

//检查再执行的缺陷

Ii(f==null){

Callable<V> evel=new Callable(){

Public call() throws ..{

return c.compute(arg);

}

};

FutureTask <T> ft=new FutureTask<T>(evel);

f=ft;

cache.put(arg,ft;

ft.run();

}

Try{

//堵塞,直到完毕

return f.get();

}cach(){

}

}

 

4.上面还有检查再执行的缺陷,在高并发的情况下啊,两方都没发现FutureTask,而且都放入Map(后一个被前一个替代),都開始了计算。

 

这里的解决方式在于。当他们都要放入Map的时候。假设能够有原子方法。那么已经有了以后,后一个FutureTask就增加,而且启动。

public  compute(A arg){

FutureTask <T> f=cace.get(arg);

//检查再执行的缺陷

Ii(f==null){

Callable<V> evel=new Callable(){

Public call() throws ..{

return c.compute(arg);

}

};

FutureTask <T> ft=new FutureTask<T>(evel);

f=cache.putIfAbsent(args,ft); //假设已经存在。返回存在的值。否则返回null

if(f==null){f=ft;ft.run();} //曾经不存在。说明应该開始这个计算

else{ft=null;} //取消该计算

}

Try{

//堵塞,直到完毕

return f.get();

}cach(){

}

}

 

5.上面的程序上来看已经完美了。只是可能带来缓存污染的可能性。假设一个计算被取消或者失败,那么这个键以后的值永远都是失败了;一种解决方式是,发现取消或者失败的task,就移除它,假设有Exception,也移除。

 

 

6.另外,假设考虑缓存过期的问题,能够为每一个结果关联一个过去时间。并周期性的扫描,清除过期的缓存

(过期时间能够用Delayed接口实现,參考DelayQueue,给他一个大于当前时间XXX的时间,,而且不断减去当前时间,直到返回负数,说明延迟时间已到了。

 

 

Java集合容器总结。


按数据结构主要有下面几类


 1,内置容器:数组
 2,list容器:Vetor,Stack。ArrayList,LinkedList,
 CopyOnWriteArrayList(1.5),AttributeList(1.5),RoleList(1.5),RoleUnresolvedList(1.5),
 ConcurrentLinkedQueue(1.5),ArrayBlockingQueue(1.5),LinkedBlockingQueue(1.5)。
 PriorityQueue(1.5)。PriorityBlockingQueue(1.5),SynchronousQueue(1.5)
 3,set容器:HashSet(1.2),LinkedHashSet(1.4),TreeSet(1.2),
 CopyOnWriteArraySet(1.5)。EnumSet(1.5),JobStateReasons。
 4,map容器:Hashtable,HashMap(1.2),TreeMap(1.2)。LinkedHashMap(1.4),WeakHashMap(1.2)。
 IdentityHashMap(1.4)。ConcurrentMap(1.5),concurrentHashMap(1.5)。
Set 接口继承 Collection,但不同意反复,使用自己内部的一个排列机制。
List 接口继承 Collection。同意反复。以元素安插的次序来放置元素,不会又一次排列。
Map接口是一组成对的键-值对象,即所持有的是key-value pairs。

Map中不能有反复的key。拥有自己的内部排列机制


按新旧主要有下面几类
Java1.2前的容器:Vector,Stack,Hashtable。
Java1.2的容器:HashSet,TreeSet。HashMap。TreeMap,WeakHashMap
Java1.4的容器:LinkedHashSet,LinkedHashMap,IdentityHashMap,ConcurrentMap。concurrentHashMap
java1.5新增:CopyOnWriteArrayList,AttributeList,RoleList,RoleUnresolvedList,
 ConcurrentLinkedQueue,ArrayBlockingQueue,LinkedBlockingQueue,PriorityBlockingQueue
 ArrayBlockingQueue,CopyOnWriteArraySet。EnumSet。
未知:JobStateReasons
按线程安全主要有下面几类
线程安全
一。使用锁
  全然不支持并发
  list容器:Vetor,Stack。CopyOnWriteArrayList,ArrayBlockingQueue,
  LinkedBlockingQueue,PriorityBlockingQueue,SynchronousQueue
  set容器:CopyOnWriteArraySet
  map容器:Hashtable
  部分支持并发
  list容器:无
  set容器:无
  map容器:concurrentHashMap
 使用非堵塞算法
 list容器:ConcurrentLinkedQueue
 set容器:无
 map容器:无
二。非线程安全
 list容器:ArrayList,LinkedList。AttributeList,RoleList,RoleUnresolvedList,PriorityQueue
 set容器:HashSet。TreeSet,LinkedHashSet。EnumSet
 map容器:HashMap,TreeMap,LinkedHashMap,WeakHashMap,IdentityHashMap,EnumMap
按遍历安全主要有下面几类
一,遍历安全
 可并发遍历
   list容器:CopyOnWriteArrayList。ConcurrentLinkedQueue
   set容器:CopyOnWriteArraySet,EnumSet,EnumMap
   map容器:无
 不可并发遍历
   list容器:Vetor。Stack,Hashtable。ArrayBlockingQueue,
   LinkedBlockingQueue,PriorityBlockingQueue,SynchronousQueue
   set容器:无
   map容器:Hashtable,concurrentHashMap
注意1:concurrentHashMap迭代器它们不会抛出ConcurrentModificationException。

只是,迭代器被设计成每次仅由一个线程使用。 
二,遍历不安全
会抛异常ConcurrentModificationException:
list容器:ArrayList,LinkedList,AttributeList,RoleList,RoleUnresolvedList
set容器:HashSet,TreeSet,TreeSet,LinkedHashSet
map容器:HashMap。TreeMap,LinkedHashMap,WeakHashMap,IdentityHashMap
注意1:返回的迭代器是弱一致 的:它们不会抛出 ConcurrentModificationException,
   也不一定显示在迭代进行时发生的不论什么映射改动的效果的容器有:
   EnumSet,EnumMap
按遍历是否有序性分类
存储数据有序
 list容器: ConcurrentLinkedQueue(1.5),ArrayBlockingQueue(1.5),LinkedBlockingQueue(1.5),
 SynchronousQueue(1.5)
 set容器:TreeSet(1.2).(他们实现了set接口),
 CopyOnWriteArraySet(1.5),EnumSet(1.5),JobStateReasons。
 map容器:TreeMap(1.2)。LinkedHashMap(1.4) 
一定规则下存储数据有序
 list容器:Stack,Vetor,ArrayList,LinkedList,CopyOnWriteArrayList(1.5),AttributeList(1.5),RoleList(1.5),RoleUnresolvedList(1.5)
 set容器:无
 map容器:无
遍历无序但移除有序
 list容器:PriorityQueue(1.5)。PriorityBlockingQueue(1.5)
 set容器:无
 map容器:无
不管怎样都无序
 list容器:无
 set容器:HashSet(1.2),LinkedHashSet(1.4)
 map容器:Hashtable,HashMap(1.2),WeakHashMap(1.2)。IdentityHashMap(1.4)。
 ConcurrentMap(1.5),concurrentHashMap(1.5)
能够按自然顺序(參见 Comparable)或比較器进行排序的有
list容器:PriorityQueue(1.5)。PriorityBlockingQueue
set容器:TreeSet(1.2)
map容器:TreeMap(1.2)
实现了RandomAccess接口的有
 ArrayList, AttributeList, CopyOnWriteArrayList, RoleList, RoleUnresolvedList, Stack, Vector
RandomAccess接口是List 实现所使用的标记接口,用来表明其支持高速(一般是固定时间)随机訪问。此接口的主要目的是同意一般的算法更改其行为,从而在将其应用到随机或连续訪问列表时能提供良好的性能。


在对List特别的遍历算法中。要尽量来推断是属于 RandomAccess(如ArrayList)还是SequenceAccess(如LinkedList),
由于适合RandomAccess List的遍历算法,用在SequenceAccess List上就区别非常大。
即对于实现了RandomAccess接口的类实例而言,此循环
 for (int i=0, i<list.size(); i++)
   list.get(i);
的执行速度要快于下面循环:
     for (Iterator i=list.iterator(); i.hasNext(); )
    i.next();





































































































以上是关于java并发容器(MapListBlockingQueue)具体解释的主要内容,如果未能解决你的问题,请参考以下文章

JAVA并发同步容器和并发容器

Java并发机制--同步容器与并发容器

Java并发编程:并发容器之ConcurrentHashMap

Java并发编程系列- Java并发容器

Java并发编程:并发容器之ConcurrentHashMap

Java学习笔记—多线程(同步容器和并发容器)