Java源码分析集合部分总结
Posted 低调的洋仔
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java源码分析集合部分总结相关的知识,希望对你有一定的参考价值。
按照自己的理解
集合类分为多个分支
一个是Collection派系的集合,一个是Map派系的集合。
两者分别支撑起来整个集合的阵营。
Collection派系的主要是List、Queue、Set.
Map的部分则是键值对的派系,大部分应该说都是直接使用了这个键值对的形式。因为Map作为一个键值对的接口,已经定义了,键值对的各种操作的方法。
为什么这么划分?
这个问题我觉得确实是一个设计的重点,当初设计的时候,肯定是在抽取各种数据集合之后形成了这样的一个结构,那么,这个设计的原因可能就在于Colloection
这集合和Map这个键值对集合的接口方法。
下面看下这两个接口的方法都有哪些吧
Collection中的方法
int size();
boolean isEmpty();
boolean contains(Object o);
Iterator<E> iterator();
Object[] toArray();
<T> T[] toArray(T[] a);
boolean add(E e);
boolean remove(Object o);
boolean containsAll(Collection<?> c);
boolean addAll(Collection<? extends E> c);
boolean removeAll(Collection<?> c);
boolean retainAll(Collection<?> c);
void clear();
boolean equals(Object o);
int hashCode();
都是线性关系中的一些操作的方法,比如size()、isEmpty()这俩在map里面也有,但是如果抽象的过于细的话反而不利于设计,太过于繁琐的抽象等于是为自己找事做了。
Map中的方法
int size();
boolean isEmpty();
boolean containsKey(Object key);
boolean containsValue(Object value);
V get(Object key);
V put(K key, V value);
V remove(Object key);
void putAll(Map<? extends K, ? extends V> m);
void clear();
Set<K> keySet();
Collection<V> values();
Set<Map.Entry<K, V>> entrySet();
boolean equals(Object o);
int hashCode();
interface Entry<K,V>
K getKey();
V getValue();
V setValue(V value);
boolean equals(Object o);
int hashCode();
先来看看Collection的派系
Collection直接下属常见的是AbstractCollection 和List和Set以及Queue
AbstractCollection这个是个抽象类,里面主要实现了部分的Collection中定义的方法,然后他继续派生出诸多的可以使用其来实现的子类出来。
主要是有
AbstractList
AbstractSet
AbstractQueue
ArrayDeque
这些类看着很眼熟,自然其本身就是为了实际的List、Queue、Set这些来服务的,相当于双重主线,一个是接口主线用于统一步调,另一个是实现的主线,抽象实现的公共部分。
那么这里面ArrayDeque不是抽象的
ArrayDeque
继承自AbastractCollection这个抽象类,然后实现了Deque这个接口,这个接口又实现了Queue这个接口,Deque这个接口无非就是在原来的基础上增加了双端的操作而已,因为Queue本身是一端的。
实现了Cloneable接口和Serializable接口
这个是一个双端队列,实现主要使用的是数组,以及head和tail索引的方式来实现的。
private transient E[] elements;
private transient int head;
private transient int tail;
private static final int MIN_INITIAL_CAPACITY = 8;
主要的属性就是这几个
默认数组大小是16个,最小的值是8个,为什么这里是
这个类中初始化的元素的个数必须保证是2的次方个,为什么?
因为后面需要用到这个值的属性,x & ( length -1 ) ,这个设计决定了要保证他的值都是2的次方才能保证length-1后每项的值都是1这样&才能起到本能的作用。
那么其扩容的话默认是扩容为原来的两倍。
private void allocateElements(int numElements)
int initialCapacity = MIN_INITIAL_CAPACITY;
// Find the best power of two to hold elements.
// Tests "<=" because arrays aren't kept full.
if (numElements >= initialCapacity)
initialCapacity = numElements;
initialCapacity |= (initialCapacity >>> 1);
initialCapacity |= (initialCapacity >>> 2);
initialCapacity |= (initialCapacity >>> 4);
initialCapacity |= (initialCapacity >>> 8);
initialCapacity |= (initialCapacity >>> 16);
initialCapacity++;
if (initialCapacity < 0) // Too many elements, must back off
initialCapacity >>>= 1;// Good luck allocating 2 ^ 30 elements
elements = (E[]) new Object[initialCapacity];
/**
* Double the capacity of this deque. Call only when full, i.e.,
* when head and tail have wrapped around to become equal.
*/
private void doubleCapacity()
assert head == tail;
int p = head;
int n = elements.length;
int r = n - p; // number of elements to the right of p
int newCapacity = n << 1;
if (newCapacity < 0)
throw new IllegalStateException("Sorry, deque too big");
Object[] a = new Object[newCapacity];
System.arraycopy(elements, p, a, 0, r);
System.arraycopy(elements, 0, a, r, p);
elements = (E[])a;
head = 0;
tail = n;
ConcurrentLinkedDeque
继承自AbstractCollection抽象类,实现了Deque接口和Serializable接口
他的实现方式和ConcurrentLinkedQueue实现方式差不多的思路,但是他用了双向链表,不过一般双端队列的话都是用的双向链表(前提是用链表来实现的话)。
private void linkFirst(E e)
checkNotNull(e);
final Node<E> newNode = new Node<E>(e);
restartFromHead:
for (;;)
for (Node<E> h = head, p = h, q;;)
if ((q = p.prev) != null &&
(q = (p = q).prev) != null)// 不是头结点
// Check for head updates every other hop.
// If p == q, we are sure to follow head instead.
p = (h != (h = head)) ? h : q; // 让h指向head,然后看h和当前的指向head之后的h是不是一致的,如果是一致的那么就返回h,不一致的就返回q
else if (p.next == p) // PREV_TERMINATOR
continue restartFromHead;
else
// p is first node
newNode.lazySetNext(p); // CAS piggyback
if (p.casPrev(null, newNode)) // p设置前一个节点为newNode。相当于前面插入的功能。
// Successful CAS is the linearization point
// for e to become an element of this deque,
// and for newNode to become "live".
if (p != h) // hop two nodes at a time // 如果检测到p和head不一致的话更新p,除非其他的更改了h的值自己本身似乎不会修改的。
casHead(h, newNode); // Failure is OK.
return;
// Lost CAS race to another thread; re-read prev
获取链表的最后一个节点
Node<E> last()
restartFromTail:
for (;;)
for (Node<E> t = tail, p = t, q;;)
if ((q = p.next) != null &&
(q = (p = q).next) != null)
// Check for tail updates every other hop.
// If p == q, we are sure to follow tail instead.
p = (t != (t = tail)) ? t : q;
else if (p == t
// It is possible that p is NEXT_TERMINATOR,
// but if so, the CAS is guaranteed to fail.
|| casTail(t, p))
return p;
else
continue restartFromTail;
这个地方明显是cas来设置尾部节点的,一方面是检查他的下一个节点是不是空的节点,另一方面进行cas竞争尾部节点,只要竞争过了那么就可以返回该节点了,也就是说只要竞争过了就认为已经是尾部节点了。
这个类主要丰富了一些头结点和尾部节点的相关操作而已,大部分的思路是ConcurrentLinkedQueue的那个思路。
接下来从AbstractList开始继续看
继承自AbstractCollection这个抽象类,然后实现了一个List接口,那么这个List接口自然是继承自Collection这个接口了,但是我比较奇怪的是List里面既然继承了Collection这个接口那么为什么里面的不少方法都是Collection里面的,个人的理解是这里做了一个强调而已,每个接口强调自己的方法,当每个使用者要实现这个方法的时候,避免因为继承关系过于繁琐而导致使用者无法在短时间内看清楚这个接口的用意和所有应该实现的接口有哪些,避免盲目使用后发现有些方法根本不能用或者是不想用。
AbstractList主要实现了部分List类型的集合的操作而已,同时封装了部分的List接口中不存在的方法丰富了整个List的功能。
AbstractSequenceList 貌似表达的是一种顺序性的结合,但是我觉得这个抽象类可能用处不大,因为目前只有一个实现类,感觉是多余的,但是可能考虑到了将来的扩展性所以还是设计了这样的一个抽象类在这里。他主要是继承了AbstractList这个类。
AbstractList类中与随机访问类相对的另一套系统,采用的是在迭代器的基础上实现的get、set、add和remove方法
LinkedList
LinkedList这个类继承自这个AbstractSequentialList这个类,实现了List接口、Deque接口、Cloneable接口、Serializable接口。
非线程安全的,实现了双向的链表。
/**
* 内部节点
* 包括了前一个节点和后一个节点
* 自己的数据
* @author Administrator
* @param <E>
*/
private static class Node<E>
E item;
Node<E> next;
Node<E> prev;
Node(Node<E> prev, E element, Node<E> next)
this.item = element;
this.next = next;
this.prev = prev;
AbstractList
ArrayList
这个类分析过N次了,基本的就是继承自AbstractList这个抽象类,然后实现了List,RandomAccess,Serializable接口和Cloneable接口
主要是用了数组来实现的。所以这样的看,AbstractList分为两个派系,一个是数组实现的增删改查之类的,而另一个是iterator来实现的增删改查之类的,所以感觉这里蛮有意思的。
初始化大小是10个。增长策略是增长原来的0.5倍的容量。
private void grow(int minCapacity)
// overflow-conscious code
int oldCapacity = elementData.length;
int newCapacity = oldCapacity + (oldCapacity >> 1);
if (newCapacity - minCapacity < 0)
newCapacity = minCapacity;
if (newCapacity - MAX_ARRAY_SIZE > 0)
newCapacity = hugeCapacity(minCapacity);
// minCapacity is usually close to size, so this is a win:
elementData = Arrays.copyOf(elementData, newCapacity);
大量的用了Arrays.copy()方法
Vector
继承自AbstractList抽象类,实现了List、RandomAccess、Serializable、Cloneable这些接口
线程安全的类,其实现线程安全的方式是加了Synchronized关键字,大部分的方法都使用了。
默认大小是10,但是扩容的策略稍微有所不同,其是如果用户自己设置了增长的容量值的话,就按照这个值进行扩容,但是如果没设置的话就默认增长原来的一倍的数据量。
本质上是和ArrayList相差不大的。
迭代器中使用了Synchronized来保证线程安全,锁住的是Vector.class。
private class Itr implements Iterator<E>
int cursor; // index of next element to return
int lastRet = -1; // index of last element returned; -1 if no such
int expectedModCount = modCount;
public boolean hasNext()
// Racy but within spec, since modifications are checked
// within or after synchronization in next/previous
return cursor != elementCount;
public E next()
synchronized (Vector.this)
checkForComodification();
int i = cursor;
if (i >= elementCount)
throw new NoSuchElementException();
cursor = i + 1;
return elementData(lastRet = i);
Stack
栈,先进后出,他继承自Vector这个类。
其实本质上来讲,栈不应该直接继承这个Vector,因为即使是两者之间存在某种方法一致的情况,也不该强行使用继承关系来进行表现。我觉得这个地方用聚合的关系更好。
同样是线程安全的都使用了Synchronized来保障线程安全,
在原来Vector的基础上通过封装方法的形式实现了Stack。
AbstractQueue
基本上都是队列的形式,包括了双端队列和单端的队列。
继承自AbstractColletion这个抽象类,实现了Queue接口。
ArrayBlockingQueue
继承自AbstractQueue,实现了BlockingQueue这个接口,然后实现了序列化接口Serializable。
BlockingQueue是继承了Queue接口。
再次加强一次,这个Queue继承自Collection,Collection下设Queue和Set以及List接口。
Collection本身抽象类有AbstractCollection
AbstractCollection下面又有AbstractList和AbstractSequentialList以及ArrayDeque
线程安全的。使用了ReentrantLock进行锁。
/** The queued items */
final Object[] items;
/** items index for next take, poll, peek or remove */
int takeIndex;
/** items index for next put, offer, or add */
int putIndex;
/** Number of elements in the queue */
int count;
final ReentrantLock lock;
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;
实现阻塞的功能是用了Condition的signal和Condition的await
public ArrayBlockingQueue(int capacity, boolean fair)
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
通过Lock在初始化的时候创建好Condition这几个实例。
public ArrayBlockingQueue(int capacity)
this(capacity, false);
有界的阻塞队列使用的是数组的形式实现的。
所有可以被外部访问的方法全部加了Lock锁保证线程的安全,但是部分的私有方法没有用,不过私有的这些方法一般都是其他的public的方法来调用的。
ConcurrentLinkedQueue
继承自AbstractQueue这个抽象类实现了Queue和Serializable接口。
节点内容不允许为 null,插入的时候检查null,直接抛出空指针。
既然是Concurrent开头那么肯定是线程安全的了,但是这个类怎么实现的呢?
private static class Node<E>
volatile E item;
volatile Node<E> next;
首先看他的节点,是内容加一个next指针,单向链表。新元素存储的时候直接插入尾部,但是出的话就是从头部开始出。
ConcurrentLinkedQueue内部采用一种wait-free(无等待)算法来实现
内部是没有锁的,他初始化状态就是一个头结点一个尾部节点,而且都指向同一个位置。
public ConcurrentLinkedQueue()
head = tail = new Node<E>(null);
通过读这些代码发现这个类实现是有些技巧在里面的。下面这个地方分两部分来解读,一部分是竞争CAS将心的节点设置为尾部节点,一旦设置成功了,就会返回true,一开始判断就是要看他q是不是尾部节点,是尾部节点的话,就竞争Cas设置自己为尾部节点,然后如果成功了,那么,如果p节点和t节点在此时不想等了顺便尝试CAS下尾部节点,然后返回true的。
如果竞争那个CAS不成功的话,分两步,一步是可能存在是p和q都是尾部节点,这个时候明显是头结点了,为了防止中途发生变更,那么更新尾节点,这个尾节点作为新的真正的尾节点继续遍历下一次,如果没法生变更的话就直接设置他的值是head了。
默认的情况是直接更新尾部节点,然后要么返回的是尾节点要么就是当前的下一个节点q。
public boolean offer(E e)
checkNotNull(e);
final Node<E> newNode = new Node<E>(e);
for (Node<E> t = tail, p = t;;)
Node<E> q = p.next;
if (q == null)
// p is last node
if (p.casNext(null, newNode))
// Successful CAS is the linearization point
// for e to become an element of this queue,
// and for newNode to become "live".
if (p != t) // hop two nodes at a time
casTail(t, newNode); // Failure is OK.
return true;
// Lost CAS race to another thread; re-read next
else if (p == q)
// We have fallen off list. If tail is unchanged, it
// will also be off-list, in which case we need to
// jump to head, from which all live nodes are always
// reachable. Else the new tail is a better bet.
p = (t != (t = tail)) ? t : head;
else
// Check for tail updates after two hops.
p = (p != t && t != (t = tail)) ? t : q;
入队列的情况。
出队列的情况。
因为节点内容也就是item不能为null,那么里面的item如果为null的话就是头节点了,这个地方也是看着头结点来的。
上面的图中形象的描述了这个过程。
先看else部分
如果当前的头节点是空的,那么说明是头结点,头结点的话出队列的时候遍历出他的下一个节点的数据,如果不是空的那么就设置当前的这个节点的item是空的,然后p如果不是头节点了的话,就更新他的状态设置为头结点,当然这个过程可以不执行,但是尽力执行的。然后返回他的item就好了。
再看if
如果当前的这个节点的内容item不是空的那么就说明了已经有线程完成了出队的操作并更新了头节点了,那么,这个地方只需要把当前元素的内容设置为null就行了
public E poll()
restartFromHead:
for (;;)
for (Node<E> h = head, p = h, q;;)
E item = p.item;
if (item != null && p.casItem(item, null))
// Successful CAS is the linearization point
// for item to be removed from this queue.
if (p != h) // hop two nodes at a time
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
else if ((q = p.next) == null)
updateHead(h, p);
return null;
else if (p == q)
continue restartFromHead;
else
p = q;
DelayQueue
继承自AbstractQueue抽象类,然后实现了BlockingQueue这个接口。
线程安全的,用锁实现的。
DelayQueue内部使用了优先级队列来。
private transient final ReentrantLock lock = new ReentrantLock();
private final PriorityQueue<E> q = new PriorityQueue<E>();
private final Condition available = lock.newCondition();
- DelayQueue是一种无界的阻塞队列,队列里只允许放入可以"延期"的元素,队列中列头的元素是最先"到期"的元素。如果队列中没有任何元素"到期",尽管队列中有元素,也不能从队列头获取到任何元素。
这个条件是当获取到的数据是null的时候就阻塞,如果完成了数据的获取的时候,最后还要执行一次signal用于激活。
这个地方就是进行一个不断的轮询任务到底时间有没有过期。如果过期了就出队列了,这个采用改了优先级队列来实现的,同时优先级队列本身又是保证了队列的首个节点是优先级最高的节点,插入元素的时候比较头部的节点时间和当前进来的这个时间是谁大,如过当前进来的时间比较短那么就触发填进去后触发出队列的poll,检查第一个节点的时间有没有过期。
这里实际上没有用别的,下面的内容忽略一下,然后本质上是通过不断的轮询检查时间是不是小于0的。重点在于不断的轮询计算剩余时间。
这个类中是实现了轮询与出入队列的功能,但是没有实现任务的时间的处理。
时间的处理实际上是用了包装任务的内部累ScheduledFutureTask来做的。这个里面实现的getDelay方法,然后基本的思路就是来计算当前时间设定的时间的差值。
public long getDelay(TimeUnit unit)
return unit.convert(time - now(), TimeUnit.NANOSECONDS);
/**
* Sets the next time to run for a periodic task.
*/
private void setNextRunTime()
long p = period;
if (p > 0)
time += p;
else
time = triggerTime(-p);
run方法
终止的取消掉
是否是周期性质的动作
/**
* Returns true if this is a periodic (not a one-shot) action.
*
* @return true if periodic
*/
public boolean isPeriodic()
return period != 0;
否则,不是周期运行的直接启动一个Task运行了
如果调用runAndReset方法后,他会将执行的线程设置为null,并把任务的状态设置为0,那么任务就可以被认为未使用过,就可以再次进行使用了。
public void run()
boolean periodic = isPeriodic();
if (!canRunInCurrentRunState(periodic))
cancel(false);
else if (!periodic)
ScheduledFutureTask.super.run();
else if (ScheduledFutureTask.super.runAndReset())
setNextRunTime();
reExecutePeriodic(outerTask);
setNextRunTime()方法设置了下一次运行的时间。
ScheduleFutureTask.super.run()和ScheduledFutureTask.super.runAndReset方法都是调用的父类中的方法,FutureTask中的方法。
调用的就是这个类中的
private final class Sync extends AbstractQueuedSynchronizer
这个类基于AQS实现的,通过Executors.callable()完成Runnable到Callable的适配,通过任务包装实现延迟计算、对比和控制任务的访问。
LinkedBlockingDeque
继承自AbstractQueue.实现了BlockingQueue和Serializable接口。
线程安全的,使用ReentrentLock实现的。
基于双向链表实现的双端队列。item表示的内容,而prev是表示的前一个节点的数据,next是后一个节点的数据。
/** Doubly-linked list node class */
static final class Node<E>
E item;
/**
* One of:
* - the real predecessor Node
* - this Node, meaning the predecessor is tail
* - null, meaning there is no predecessor
*/
Node<E> prev;
/**
* One of:
* - the real successor Node
* - this Node, meaning the successor is head
* - null, meaning there is no successor
*/
Node<E> next;
Node(E x)
item = x;
transient Node<E> first;
/**
* Pointer to last node.
* Invariant: (first == null && last == null) ||
* (last.next == null && last.item != null)
*/
transient Node<E> last;
/** Number of items in the deque */
private transient int count;
/** Maximum number of items in the deque */
private final int capacity;
/** Main lock guarding all access */
final ReentrantLock lock = new ReentrantLock();
/** Condition for waiting takes */
private final Condition notEmpty = lock.newCondition();
/** Condition for waiting puts */
private final Condition notFull = lock.newCondition();
LinkedBlockingDeque主要的属性有这么几个,包括了first和last分别指向了首节点和尾节点,那么头节点和尾节点,通过可以看到记录了count的数量信息,然后还有个capacity这个值应该就表示其是有容量限制的。然后还有两个Condition分别用于满了和空的时候的阻塞。
默认的大小是最大的整型值
public LinkedBlockingDeque()
this(Integer.MAX_VALUE);
public LinkedBlockingDeque(int capacity)
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
可以概括为一个锁,两个条件,一个双向链表数据结构。
这个类的实现上肯定是很明确了,比如怎么阻塞的,举个例子就是当容量达到了最大的值了,那么这个时候,通过notFull条件来await然后达到阻塞的目的,这个时候应该调用的是notEmpty的信号量来促使其进行消费,然后当数据容量减少的时候那么这个时候就会调用notFull的siganl唤醒,然后就可以插入数据了。同样的是当空的时候notEmpty是阻塞调用的await阻塞在那,然后当不是空的时候,就调用notEmpty的signal来唤醒阻塞在empty上面的线程,然后非空可以获取数据了。
LinkedBlockingQueue
继承自AbstractQueue这个抽象类,然后实现了BlockingQueue这个接口和Serializable接口
这个类就是队列了,基于单项链表实现的有界的阻塞队列,
内部的单项链表的定义如下:
/**
* Linked list node class
*
*/
static class Node<E>
E item;
/**
* One of:
* - the real successor Node
* - this Node, meaning the successor is head.next
* - null, meaning there is no successor (this is the last node)
*/
Node<E> next;
Node(E x) item = x;
内部包含了几个重要的属性
/** The capacity bound, or Integer.MAX_VALUE if none */
private final int capacity;
/** Current number of elements */
private final AtomicInteger count = new AtomicInteger(0);
// 保存首尾
/**
* Head of linked list.
* Invariant: head.item == null
*/
private transient Node<E> head;
/**
* Tail of linked list.
* Invariant: last.next == null
*/
private transient Node<E> last;
//首先可见,内部为单向链表;其次,内部为两把锁:存锁和取锁,并分别关联一个条件(是一种双锁队列)
// 意思就是存的时候单独锁前面头结点,取得时候单独锁尾节点
/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();
/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();
/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();
头节点和尾节点都有存。那么可以双端的访问,然后两个锁和两个条件,那么两个锁分别是干嘛的,一个是用于取一个是用于放的,那么这个地方明显的表现出了一定的设计原理,那就是说,这个队列存和取肯定是分开的,要么头节点开始存,尾部节点开始取,要么头节点开始取,尾部节点开始存,
实际采用的策略是从尾部进行插入,然后从头部进行读取的。
这个类中大部分的操作是单个锁完成,然后少部分的用了两个锁来实现的。下面是进行了锁的封装。
void fullyLock()
putLock.lock();
takeLock.lock();
/**
* Unlock to allow both puts and takes.
*/
void fullyUnlock()
takeLock.unlock();
putLock.unlock();
有容量限制默认是最大值int的可以自己设置容量
public LinkedBlockingQueue()
this(Integer.MAX_VALUE);
public LinkedBlockingQueue(int capacity)
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
添加元素的操作
public void put(E e) throws InterruptedException
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
Node<E> node = new Node(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try
/*
* Note that count is used in wait guard even though it is
* not protected by lock. This works because count can
* only decrease at this point (all other puts are shut
* out by lock), and we (or some other waiting put) are
* signalled if it ever changes from capacity. Similarly
* for all other uses of count in other wait guards.
* 满了就等待
*/
while (count.get() == capacity)
notFull.await();
// 没满就入队
enqueue(node);
c = count.getAndIncrement();
/*
* 注意这里的处理:和单锁队列不同,count为原子量,不需要锁保护。
* put过程中可能有其他线程执行多次get,所以这里需要判断一下当前
* 如果还有剩余容量,那么继续唤醒notFull条件上等待的线程。
*/
if (c + 1 < capacity)
notFull.signal();
finally
putLock.unlock();
// 如果count又0变为1,说明在队列是空的情况下插入了1个元素,唤醒notNull条件上等待的线程。
// count.getAndIncrement();返回的是旧值
if (c == 0)
signalNotEmpty();
没满就通知notFull。不空的话就通知notEmpty。
下面这个remove就用了两把锁,防止当前的节点在删除的过程中冲突了。
* 主要方法里并没有同时用两把锁,但有些方法里会同时使用两把锁,比如remove方法等:
* @param o element to be removed from this queue, if present
* @return @code true if this queue changed as a result of the call
*/
public boolean remove(Object o)
if (o == null) return false;
fullyLock();
try
for (Node<E> trail = head, p = trail.next;
p != null;
trail = p, p = p.next)
if (o.equals(p.item))
unlink(p, trail);
return true;
return false;
finally
fullyUnlock();
还有contains方法也是两个锁同时。
toString和clear方法也是。
PriorityBlockingQueue
优先级阻塞队列
继承自AbstractQueue这个抽象类,然后实现了BlockingQueue、Serializable接口。
基于PriorityQueue实现的无界阻塞队列,PriorityQueue是一个优先级队列,使用的是二叉堆的方式实现的,二叉堆本质上是一个完全二叉树。其左右子节点比父节点要么大要么小,所以可以分为大堆和小堆。这个地方我觉得基于PriorityQueue来实现的有点问题,实际上在里面定义了这样的一个变量但是根据源码中使用的情况来看,只是在writeObject和readObject这两个方法中使用了,其他的都是自己实现的,这不禁让我表示怀疑。也就是说序列化的情况下才会需要这个PriorityQueue。
private static final int DEFAULT_INITIAL_CAPACITY = 11;
private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
private transient Object[] queue;
private transient int size;
private transient Comparator<? super E> comparator;
private final ReentrantLock lock;
private final Condition notEmpty;
private transient volatile int allocationSpinLock;
/**
* 通过PriorityQueue来实现的
*/
private PriorityQueue q;
public PriorityBlockingQueue()
this(DEFAULT_INITIAL_CAPACITY, null);
默认的初始化大小是11,能够进行扩容,扩容策略是如果大小不超过64的话就扩容为原来的两倍,否则的话就扩容为原来的1.5倍。
这个地方实际存储数据的是自己定义的这个Object的数组,当然其维护堆的结构也是自己进行了实现的。
* 出队,然后进行调整堆
*/
private E dequeue()
int n = size - 1;
if (n < 0)
return null;
else
Object[] array = queue;
E result = (E) array[0];
E x = (E) array[n];
array[n] = null;
Comparator<? super E> cmp = comparator;
if (cmp == null)
siftDownComparable(0, x, array, n);
else
siftDownUsingComparator(0, x, array, n, cmp);
size = n;
return result;
看这个出队列的方法,他出队列的时候就是把第0个直接出队列的,然后出完后直接进行二叉堆结构的调整。
private static <T> void siftDownComparable(int k, T x, Object[] array,
int n)
if (n > 0)
Comparable<? super T> key = (Comparable<? super T>)x;
int half = n >>> 1; // loop while a non-leaf
while (k < half)
int child = (k << 1) + 1; // assume left child is least
Object c = array[child];
int right = child + 1;
if (right < n &&
((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
c = array[child = right];
if (key.compareTo((T) c) <= 0)
break;
array[k] = c;
k = child;
array[k] = key;
调整二叉堆的结构的方法,自己在内部进行了实现,然后进行一个二叉堆的数据的调整,这个实现的时候是看父节点和他的兄弟节点谁更小谁更大就把他调整上去。
public E poll()
final ReentrantLock lock = this.lock;
lock.lock();
try
return dequeue();
finally
lock.unlock();
加了Lock之后实现的线程安全,内部通过条件控制阻塞的情况,一看到Blocking基本上都是Condition.await和signal
PriorityQueue
优先级队列,继承自AbstractQueue并实现了Serializable接口。
private static final int DEFAULT_INITIAL_CAPACITY = 11;
private transient Object[] queue;
private int size = 0;
private final Comparator<? super E> comparator;
private transient int modCount = 0;
默认的初始化大小是11,然后通过自己定义的数组存储数据的。
本质上就是二叉堆的一些操作,不过他不是线程安全的,内部的实现和PriorityBlockingQueue类似,只不过是该类没实现阻塞的功能和线程安全。
扩容原理也一样,小的话扩容为2倍(< 64),大的话就扩容为原来的1.5倍。
SynchronousQueue
继承自AbstractQueue这个抽象类,然后实现了BlockingQueue和Serializable这个接口。
一种特殊的阻塞队列,本身没有容量,当一个线程从队列中取数据的时候,另一个线程才能够将数据放入队列中来,存取的过程就相当于一个线程把数据交给另一个线程。
无法调用peek看是不是存在元素,只要当要进行取的时候才可能存在,这时候有了消费者了。
Executors.newCachedThreadPool()使用了SynchronousQueue。
生产者、消费者的平衡者,内在的实现可以是队列或者是栈,根据构造方法的参数是true或者是false来确定。
经过验证可以确认,当没有数据的时候,其实会存储请求线程作为一个Node存储在栈里面(默认是用的栈,当然看传的参数有影响的)。然后存储进来的线程会尝试取一次然后自己处于非激活状态。一旦数据到来的时候,激活该线程运行。
public SynchronousQueue()
this(false);
/**
* Creates a <tt>SynchronousQueue</tt> with the specified fairness policy.
*
* @param fair if true, waiting threads contend in FIFO order for
* access; otherwise the order is unspecified.
*/
public SynchronousQueue(boolean fair)
transferer = fair ? new TransferQueue() : new TransferStack();
默认是false的,false的情况下,使用的是TransferStack来处理请求的。
如果是true的话就用TransferQueue来处理的。
Object transfer(Object e, boolean timed, long nanos)
/* Basic algorithm is to loop trying to take either of
* two actions:
*
* 1. If queue apparently empty or holding same-mode nodes,
* try to add node to queue of waiters, wait to be
* fulfilled (or cancelled) and return matching item.
*
* 2. If queue apparently contains waiting items, and this
* call is of complementary mode, try to fulfill by CAS'ing
* item field of waiting node and dequeuing it, and then
* returning matching item.
*
* In each case, along the way, check for and try to help
* advance head and tail on behalf of other stalled/slow
* threads.
*
* The loop starts off with a null check guarding against
* seeing uninitialized head or tail values. This never
* happens in current SynchronousQueue, but could if
* callers held non-volatile/final ref to the
* transferer. The check is here anyway because it places
* null checks at top of loop, which is usually faster
* than having them implicitly interspersed.
*/
// 队列从尾部取数据进行匹配
QNode s = null; // constructed/reused as needed
boolean isData = (e != null);
for (;;)
QNode t = tail;
QNode h = head;
if (t == null || h == null) // saw uninitialized value 看到未初始化的首尾节点。
continue; // spin
if (h == t || t.isData == isData) // empty or same-mode 队列是空的,或者当前节点和队列中节点的模式一样
QNode tn = t.next;
if (t != tail) // inconsistent read 读取到t不是tail了,那么肯定有别的线程修改了其内容,自旋
continue;
if (tn != null) // lagging tail 说明已经有其他的线程修改了该值,只是暂时没有把他的值设置为tail而已。
advanceTail(t, tn); // 当前线程帮助推进尾节点
continue;
if (timed && nanos <= 0) // can't wait 超时了
return null;
if (s == null)
s = new QNode(e, isData); // 初始化s
if (!t.casNext(null, s)) // failed to link in 尝试将当前节点插入到t后面,t这里应该是尾部节点
continue;
advanceTail(t, s); // swing tail and wait 尝试把s设置为尾部节点
Object x = awaitFulfill(s, e, timed, nanos); // 然后等着被匹配
if (x == s) // wait was cancelled 如果被取消
clean(t, s); // 清理s节点
return null;
if (!s.isOffList()) // not already unlinked 如果s节点没有离开队列
advanceHead(t, s); // unlink if head 尝试把s设置为头结点,并移除t
if (x != null) // and forget fields
s.item = s;
s.waiter = null;
return (x != null) ? x : e;
else // complementary-mode 模式正好互补
QNode m = h.next; // node to fulfill 找到能匹配的节点
if (t != tail || m == null || h != head)
continue; // inconsistent read 读取到不一致的结果进入下一轮循环
Object x = m.item;
if (isData == (x != null) || // m already fulfilled m已经被匹配了的话
x == m || // m cancelled m被取消
!m.casItem(x, e)) // lost CAS 尝试把数据e设置到m上失败
advanceHead(h, m); // dequeue and retry h出队并将m设置为头结点然后重试
continue;
advanceHead(h, m); // successfully fulfilled 匹配成功
LockSupport.unpark(m.waiter); // 唤醒m上面等待的线程
return (x != null) ? x : e;
看这个Node的节点信息
static final class QNode
volatile QNode next; // next node in queue
volatile Object item; // CAS'ed to or from null
volatile Thread waiter; // to control park/unpark
final boolean isData;
一个next,用了链表,然后tail记录下尾部节点,head记录下头结点,waiter是等待的线程实例,然后isData表示是不是数据还是请求,如果接受了一次数据后正在执行时来了另一个数据那么这个时候第二次的数据直接忽略,第一次的数据进行匹配,如果匹配不上也就是忽略了,直到有请求进行匹配。
false的话就用TransferStack来处理。
他是将栈顶作为匹配的,一但匹配上就弹出两个配对的元素,否则直接抛弃。
Object transfer(Object e, boolean timed, long nanos)
/*
* Basic algorithm is to loop trying one of three actions:
*
* 1. If apparently empty or already containing nodes of same
* mode, try to push node on stack and wait for a match,
* returning it, or null if cancelled.
*
* 2. If apparently containing node of complementary互补的 mode方式,
* try to push a fulfilling node on to stack, match
* with corresponding waiting node, pop both from
* stack, and return matched item. The matching or
* unlinking might not actually be necessary because of
* other threads performing action 3:
*
* 3. If top of stack already holds another fulfilling node,
* help it out by doing its match and/or pop
* operations, and then continue. The code for helping
* is essentially the same as for fulfilling, except
* that it doesn't return the item.
*
* 基本算法是在一个无限循环中尝试下面三种情况里面的一种:
*
* 1. 如果当前栈为空或者包含与给定节点模式相同的节点,尝试
* 将节点压入栈内,并等待一个匹配节点,最后返回匹配节点
* 或者null(如果被取消)。
*
* 2. 如果当前栈包含于给定节点模式互补的节点,尝试将这个节
* 点打上FULFILLING标记,然后压入栈中,和相应的节点进行
* 匹配,然后将两个节点(当前节点和互补节点)弹出栈,并返
* 回匹配节点的数据。匹配和删除动作不是必须要做的,因为
* 其他线程会执行动作3:
*
* 3. 如果栈顶已经存在一个FULFILLING(正在满足其他节点)的节
* 点,帮助这个节点完成匹配和移除(出栈)的操作。然后继续
* 执行(主循环)。这部分代码基本和动作2的代码一样,只是
* 不会返回节点的数据
*/
SNode s = null; // constructed/reused as needed
int mode = (e == null) ? REQUEST : DATA;
for (;;)
SNode h = head;
if (h == null || h.mode == mode) // empty or same-mode head为null或者head和e的mode相同
if (timed && nanos <= 0) // can't wait 超时了
if (h != null && h.isCancelled()) // h不是空的且已经被取消了
casHead(h, h.next); // pop cancelled node
else
return null; // 否则返回null
else if (casHead(h, s = snode(s, e, h, mode))) // 创建一个snode赋值给s,将原本的head节点赋值为s的next节点,尝试将他作为head节点
// 尝试其他的线程来满足当前线程
SNode m = awaitFulfill(s, timed, nanos);
if (m == s) // wait was cancelled awaitFulfill方法返回后,判断是不是被取消了。
clean(s); // 如果被取消了,就清理一个s节点
return null;
if ((h = head) != null && h.next == s) // 上面已经将s设置为head,如果满足了当前if中的条件说明其他的节点t插入到了s前面,变成了head,而且这个t就是和s匹配的节点,他们已经完成了匹配。
casHead(h, s.next); // help s's fulfiller 将s的next节点设置为head,相当于把s和t一起移除了,
return (mode == REQUEST) ? m.item : s.item;
else if (!isFulfilling(h.mode)) // try to fulfill 如果栈中存在头结点,且和当前节点不是在同一个模式下,那么说明他们是一对对等的节点,尝试用当前节点来满足h节点。
if (h.isCancelled()) // already cancelled 如果h节点已经被取消了
casHead(h, h.next); // pop and retry 把节点h弹出,并将h节点的next节点设置为栈的head
else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) // 尝试在当前节点打上正在匹配的标记,设置为head
for (;;) // loop until matched or waiters disappear
SNode m = s.next; // m is s's match s是当前节点,m是s的next节点,他们正在匹配的两个节点
if (m == null) // all waiters are gone 如果m是空,可能其他的节点把m匹配走了。
casHead(s, null); // pop fulfill node 把s弹出,也就是设置s头结点为null
s = null; // use new node next time 将s置为空,下轮循环的时候会新建
break; // restart main loop 回退到主循环再来一次
SNode mn = m.next; // 获取m的next节点,如果s和m匹配成功的话,mn就得补上head的位置
if (m.tryMatch(s)) // 尝试匹配一下啊,匹配成功的话就把m上等待的线程唤醒。
casHead(s, mn); // pop both s and m // 匹配成功就把s和m弹出
return (mode == REQUEST) ? m.item : s.item;
else // lost match 没匹配成功说明m可能被其他的节点满足了
s.casNext(m, mn); // help unlink 说明m已经被其他的节点匹配了,那就把m删除掉
else // help a fulfiller 栈顶的h正在匹配中
SNode m = h.next; // m is h's match m是h的配对,h正在和m
if (m == null) // waiter is gone 如果m为null的话说明已经被匹配走了
casHead(h, null); // pop fulfilling node 弹出栈顶元素不进行匹配了。
else
SNode mn = m.next; // 获取m的next节点,如果m和h匹配成功,mn就得补上head位置
if (m.tryMatch(h)) // help match 帮助匹配一下m和h
casHead(h, mn); // pop both h and m 匹配成功将h和m出栈
else // lost match
h.casNext(m, mn); // help unlink 没成功的话说明m已经被其他的节点满足了。就把m移除
注意:
take方法调用的时候传进来的e是null的,也就是说,到底是请求还是数据都通过这个null区分开了。
LinkedTransferQueue
继承自AbstractQueue抽象类,然后实现了TransferQueue接口和Serializable接口。
这里的TransferQueue本质上肯定也存储了数据,只不过这个存储的是请求的节点数据信息。进入的请求线程在尝试几次的获取数据之后,会调用park方法将自己挂起,然后等待数据到来的时候激活他。
LinkedTransferQueue实现了一个重要的接口TransferQueue,该接口含有下面几个重要方法:
1. transfer(E e)
若当前存在一个正在等待获取的消费者线程,即立刻移交之;否则,会插入当前元素e到队列尾部,并且等待进入阻塞状态,到有消费者线程取走该元素。
2. tryTransfer(E e)
若当前存在一个正在等待获取的消费者线程(使用take()或者poll()函数),使用该方法会即刻转移/传输对象元素e; 若不存在,则返回false,并且不进入队列。这是一个不阻塞的操作。
3. tryTransfer(E e, long timeout, TimeUnit unit)
若当前存在一个正在等待获取的消费者线程,会立即传输给它; 否则将插入元素e到队列尾部,并且等待被消费者线程获取消费掉, 若在指定的时间内元素e无法被消费者线程获取,则返回false,同时该元素被移除。
4. hasWaitingConsumer()
判断是否存在消费者线程
5. getWaitingConsumerCount()
获取所有等待获取元素的消费线程数量
transfer算法比较复杂,实现很难看明白。大致的理解是采用所谓双重数据结构(dual data structures)。之所以叫双重,其原因是方法都是通过两个步骤完成: 保留与完成。比如消费者线程从一个队列中取元素,发现队列为空,他就生成一个空元素放入队列,所谓空元素就是数据项字段为空。然后消费者线程在这个字段上旅转等待。这叫保留。直到一个生产者线程意欲向队例中放入一个元素,这里他发现最前面的元素的数据项字段为NULL,他就直接把自已数据填充到这个元素中,即完成了元素的传送。大体是这个意思,这种方式优美了完成了线程之间的高效协作。
消费者保留,数据来了做比对,只要配对就可以触发线程唤醒,然后去处理了。
For example, a possible snapshot of
* a queue is:
*
* head tail
* | |
* v v
* M -> M -> U -> U -> U -> U
相当于这里利用了两个较好的特性。一个是前面的ConcurrentLinkedQueue的cas特性,尽可能的改变尾节点来实现更新,但是并不保证尾节点一定是尾节点。然后数据的话就是对于请求和过来的数据进行匹配才能拿到数据。
那么这个和Synchronous的区别在于这个存储数据了,但是做法和那个差不多,要匹配才能拿走数据否则不能拿走数据,现在是数据存在里面,来一个消费者需要取的话,进行配对,一旦配对成功可以删除了,配对不成功返回null。
take的时候传入false,add和offer的时候传入的true (isData)
个人感觉先记得原理再去看这个queue更好,要不然累死。LinkedTransferQueue采用的一种预占模式。意思就是消费者线程取元素时,如果队列为空,那就生成一个节点(节点元素为null)入队,然后消费者线程park住,后面生产者线程入队时发现有一个元素为null的节点,生产者线程就不入队了,直接就将元素填充到该节点,唤醒该节点上park住线程,被唤醒的消费者线程拿货走人。这就是预占的意思:有就拿货走人,没有就占个位置等着,等到或超时。
Node节点需要注意的是:匹配后节点的item的变化,以及搭配isData来综合判断是否匹配过isMatched()
。
Node | node1(isData-item) | node2(isData-item) |
---|---|---|
匹配前 | true-item | false-null |
匹配后 | true-null | false-this |
/**
* Implements all queuing methods. See above for explanation.
*
* @param e the item or null for take
* @param haveData true if this is a put, else a take
* @param how NOW, ASYNC, SYNC, or TIMED
* @param nanos timeout in nanosecs, used only if mode is TIMED
* @return an item if matched, else e
* @throws NullPointerException if haveData mode but e is null
*/
private E xfer(E e, boolean haveData, int how, long nanos)
if (haveData && (e == null))
throw new NullPointerException();
Node s = null; // the node to append, if needed
retry:
for (;;) // restart on append race
for (Node h = head, p = h; p != null;) // find & match fi以上是关于Java源码分析集合部分总结的主要内容,如果未能解决你的问题,请参考以下文章