聊聊高并发(三十二)实现一个基于链表的无锁Set集合
Posted slgkaifa
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了聊聊高并发(三十二)实现一个基于链表的无锁Set集合相关的知识,希望对你有一定的参考价值。
Set表示一种没有反复元素的集合类,在JDK里面有HashSet的实现,底层是基于HashMap来实现的。这里实现一个简化版本号的Set,有下面约束:
1. 基于链表实现。链表节点依照对象的hashCode()顺序由小到大从Head到Tail排列。
2. 如果对象的hashCode()是唯一的。这个如果实际上是不成立的,这里为了简化实现做这个如果。实际情况是HashCode是基于对象地址进行的一次Hash操作。目的是把对象依据Hash散开。所以可能有多个对象地址相应到一个HashCode。也就是哈希冲突,要做冲突的处理。
常见的处理就是使用外部拉链法,在相应的Hash的节点再创建1个链表,同样HashCode的节点在这个链表上排列,再依据equals()方法来识别是否是同一个节点。这个是HashMap的基本原理。
有了以上如果,这篇从最简单的加锁的方式到终于的无锁展示一下怎样一步一步演进的过程,以及终于无锁实现要考虑的要点。
1. 粗粒度锁
2. 细粒度锁
3. 乐观锁
4. 懒惰锁
5. 无锁
先看一下Set接口,1个加入方法。1个删除方法,1个检查方法
package com.lock.test; public interface Set<T> { public boolean add(T item); public boolean remove(T item); public boolean contains(T item); }
第一个实现的是使用粗粒度的锁,就是对整个链表加锁。这样肯定是线程安全的。可是效率肯定是最差的
链表节点类的定义,三个元素
1. 增加节点的元素
2. next指针指向下一个节点,明显这是个单链表结构
3. key表示item的HashCode
class Node<T>{ T item; Node<T> next; int key; public Node(T item){ this.item = item; this.key = item.hashCode(); } public Node(){} }粗粒度链表的实现
1. 链表维护了一个头节点,头节点始终指向最小的HashCode,头节点初始的next指针指向最大的HashCode,表示尾节点,整个链表是依照HashCode从小往大排列
2. 链表维护了一个总体的锁。add, remove, contains都加锁,保证线程安全,简单粗暴。可是效率低下
class CoarseSet<T> implements Set<T>{ private final Node<T> head; private java.util.concurrent.locks.Lock lock = new ReentrantLock(); public CoarseSet(){ head = new Node<T>(); head.key = Integer.MIN_VALUE; Node<T> MAX = new Node<T>(); MAX.key = Integer.MAX_VALUE; head.next = MAX; } public boolean add(T item){ Node<T> pred, curr; int key = item.hashCode(); lock.lock(); try{ pred = head; curr = head.next; while(curr.key < key){ pred = curr; curr = curr.next; } if(curr.key == key){ return false; } Node<T> node = new Node<T>(item); node.next = curr; pred.next = node; return true; }finally{ lock.unlock(); } } public boolean remove(T item){ Node<T> pred, curr; int key = item.hashCode(); lock.lock(); try{ pred = head; curr = head.next; while(curr.key < key){ pred = curr; curr = curr.next; } if(curr.key == key){ pred.next = curr.next; curr.next = null; return true; } return false; }finally{ lock.unlock(); } } public boolean contains(T item){ Node<T> pred, curr; int key = item.hashCode(); lock.lock(); try{ pred = head; curr = head.next; while(curr.key < key){ pred = curr; curr = curr.next; } return curr.key == key; }finally{ lock.unlock(); } } }
对粗粒度链表的优化能够细化锁的粒度。粗粒度锁的问题在于使用了一把锁锁住了整个链表,那么能够使用多把锁,每一个节点维护一把锁,这样单个节点上锁理论上不会影响其它节点。
单链表最简单add操作须要做两步
1. 把新增加节点的next指向当前节点
2. 把前驱节点的next指向新增加的节点
node.next = curr; pred.next = node
单链表的删除操作仅仅须要做一步
pred.next = curr.next
假设使用细粒度锁的话。加入和删除操作要同一时候锁住两个节点才干保证加入和删除的正确性。不然有可能加入进来的节点指向了一个已经被删除的节点。
所以须要同一时候控制两把锁,这也叫锁耦合,须要先获取前驱节点的锁,再获取当前节点的锁。
因为这样的锁的获取是依照从前往后的顺序获取的,是一个方向的,所以不会引起死锁的问题。
死锁有两种:
1. 由获取锁的顺序引起的,顺序形成了环
2. 由资源问题引起的
来看看细粒度锁的实现,首先是带锁的Node
class NodeWithLock<T>{ T item; NodeWithLock<T> next; int key; java.util.concurrent.locks.Lock lock = new ReentrantLock(); public NodeWithLock(T item){ this.item = item; this.key = item.hashCode(); } public NodeWithLock(){} public void lock(){ lock.lock(); } public void unlock(){ lock.unlock(); } }
细粒度链表实现的Set
1. 获取锁时。先获取pred锁。然后获取curr的锁
2. 当pred和curr的指针往后继节点移动时,要先释放pred锁,然后让pred指向curr,然后curr再指向next节点,然后curr再上锁,这样保证了同一时候前后两把锁存在
class FineList<T> implements Set<T>{ private final NodeWithLock<T> head; public FineList(){ head = new NodeWithLock<T>(); head.key = Integer.MIN_VALUE; NodeWithLock<T> MAX = new NodeWithLock<T>(); MAX.key = Integer.MAX_VALUE; head.next = MAX; } public boolean add(T item){ NodeWithLock<T> pred, curr; int key = item.hashCode(); head.lock(); pred = head; try{ curr = pred.next; curr.lock(); try{ while(curr.key < key){ pred.unlock(); pred = curr; curr = curr.next; curr.lock(); } if(curr.key == key){ return false; } NodeWithLock<T> node = new NodeWithLock<T>(item); node.next = curr; pred.next = node; return true; }finally{ curr.unlock(); } }finally{ pred.unlock(); } } public boolean remove(T item){ NodeWithLock<T> pred, curr; int key = item.hashCode(); head.lock(); pred = head; try{ curr = pred.next; curr.lock(); try{ while(curr.key < key){ pred.unlock(); pred = curr; curr = curr.next; curr.lock(); } if(curr.key == key){ pred.next = curr.next; curr.next = null; return true; } return false; }finally{ curr.unlock(); } }finally{ pred.unlock(); } } public boolean contains(T item){ NodeWithLock<T> pred, curr; int key = item.hashCode(); head.lock(); pred = head; try{ curr = pred.next; curr.lock(); try{ while(curr.key < key){ pred.unlock(); pred = curr; curr = curr.next; curr.lock(); } return curr.key == key; }finally{ curr.unlock(); } }finally{ pred.unlock(); } } }
细粒度的锁对粗粒度的锁有一定的优化,可是还存在问题:
当前面的节点被锁住时,后面的节点无法操作。必须等待前面的锁释放
所以一种自然而然的想法就是“仅仅在须要加锁的时候再加锁”,也就是乐观锁
1. 仅仅有在寻找到要加锁位置的时候才加锁。之前不加锁
2. 须要加锁时,先加锁,再进行验证是否现场已经被改动
3. 假设被改动就须要从头開始再次查找。是一个轮询的过程
所以乐观锁也是一个轮询检查的过程。
来看看检查的方法,检查的目的有两个:
1. 确认要操作的现场没有被改动。也就是说pred.next == curr
2. 确认pred和curr都是能够到达的。没有被物理删除,也就是node = node.next能够到达
private boolean validate(NodeWithLock<T> pred, NodeWithLock<T> curr){ NodeWithLock<T> node = head; while(node.key <= pred.key){ if(node.key == pred.key){ return pred.next == curr; } node = node.next; } return false; }
再来看看基于乐观锁的Set实现
1. 先不加锁。直到找到要处理的现场,也就是while(curr.key < key)退出的地方。退出之后有两种情况。curr.key == key和curr.key > key。
2. 找到现场后,要对pred和curr都加锁,加锁顺序也是从前往后的顺序
3. 验证现场未被改动,然后进行操作,假设被改动了,就释放锁。再次从头节点開始轮询操作
class OptimisticSet<T> implements Set<T>{ private final NodeWithLock<T> head; public OptimisticSet(){ head = new NodeWithLock<T>(); head.key = Integer.MIN_VALUE; NodeWithLock<T> MAX = new NodeWithLock<T>(); MAX.key = Integer.MAX_VALUE; head.next = MAX; } @Override public boolean add(T item) { NodeWithLock<T> pred, curr; int key = item.hashCode(); while(true){ pred = head; curr = pred.next; while(curr.key < key){ pred = curr; curr = curr.next; } pred.lock(); curr.lock(); try{ if(validate(pred, curr)){ if(curr.key == key){ return false; } NodeWithLock<T> node = new NodeWithLock<T>(item); node.next = curr; pred.next = node; return true; } }finally{ pred.unlock(); curr.unlock(); } } } @Override public boolean remove(T item) { NodeWithLock<T> pred, curr; int key = item.hashCode(); while(true){ pred = head; curr = pred.next; while(curr.key < key){ pred = curr; curr = curr.next; } pred.lock(); curr.lock(); try{ if(validate(pred, curr)){ if(curr.key == key){ pred.next = curr.next; curr.next = null; return true; } return false; } }finally{ pred.unlock(); curr.unlock(); } } } @Override public boolean contains(T item) { NodeWithLock<T> pred, curr; int key = item.hashCode(); while(true){ pred = head; curr = pred.next; while(curr.key < key){ pred = curr; curr = curr.next; } pred.lock(); curr.lock(); try{ if(validate(pred, curr)){ return curr.key == key; } }finally{ pred.unlock(); curr.unlock(); } } } private boolean validate(NodeWithLock<T> pred, NodeWithLock<T> curr){ NodeWithLock<T> node = head; while(node.key <= pred.key){ if(node.key == pred.key){ return pred.next == curr; } node = node.next; } return false; } }
乐观锁降低了非常大的一部分锁的争用问题,可是它还是存在一定的锁的冲突。尤其是contains操作时也须要加锁。实际上假设contains一个节点要被删除的话,能够先标记成逻辑删除。再进行物理删除。由于要删除时是须要加锁的。所以最后的物理删除肯定会成功。假设先标记成逻辑删除。那么contains操作的时候事实上是能够不须要锁的,假设它看到了节点被标记逻辑删除了。那么肯定就contains失败了,假设没有看到逻辑删除。那么表示在contains操作的时候是还没有被删除的,即使它可能已经被加锁准备删除了。可是contains看到的是一个状态的快照。当contains操作的时候确实是存在的。所以返回true也是正确的。
快照这个思想在并发编程中是非常重要的思想。由于并发的存在,假设不使用同步手段,比方加锁,CAS操作,那么看到的都非常可能是快照。是一瞬间的状态。不能全然依据这一瞬间的状态来决定兴许操作,可是看到快照的那一瞬间的操作确实是成功的
所以能够通过把删除操作分为逻辑删除和物理删除两个节点来把contains操作的锁去掉,由于contains操作也是一个非常频繁的操作。
能够通过给节点加入一个状态来表示是否被删除,最简单的做法就是加入一个volatile字段保证状态的可见性
static class NodeWithLockAndMark<T>{ T item; NodeWithLockAndMark<T> next; int key; java.util.concurrent.locks.Lock lock = new ReentrantLock(); // 标记逻辑删除的状态 volatile boolean marked; public NodeWithLockAndMark(T item){ this.item = item; this.key = item.hashCode(); } public NodeWithLockAndMark(){} public void lock(){ lock.lock(); } public void unlock(){ lock.unlock(); } }
懒惰的Set实现是在乐观锁的基础上实现的。有了逻辑状态,validate方法就简化了。仅仅须要推断现场的节点是否被标记删除了,而且现场未被改动过
private boolean validate(NodeWithLockAndMark<T> pred, NodeWithLockAndMark<T> curr){ return !pred.marked && !curr.marked && pred.next == curr; }看看懒惰Set的实现
1. add和remove操作和乐观锁的过程基本一致,仅仅是在remove时,先标记节点的逻辑删除状态,再物理删除
2. contains方法能够去掉锁了。注意的是它也是保证快照的正确性
class LazySet<T> implements Set<T>{ private final NodeWithLockAndMark<T> head; public LazySet(){ head = new NodeWithLockAndMark<T>(); head.key = Integer.MIN_VALUE; NodeWithLockAndMark<T> MAX = new NodeWithLockAndMark<T>(); MAX.key = Integer.MAX_VALUE; head.next = MAX; } @Override public boolean add(T item) { NodeWithLockAndMark<T> pred, curr; int key = item.hashCode(); while(true){ pred = head; curr = pred.next; while(curr.key < key){ pred = curr; curr = curr.next; } pred.lock(); curr.lock(); try{ if(validate(pred, curr)){ if(curr.key == key){ return false; } NodeWithLockAndMark<T> node = new NodeWithLockAndMark<T>(item); node.next = curr; pred.next = node; return true; } }finally{ pred.unlock(); curr.unlock(); } } } @Override public boolean remove(T item) { NodeWithLockAndMark<T> pred, curr; int key = item.hashCode(); while(true){ pred = head; curr = pred.next; while(curr.key < key){ pred = curr; curr = curr.next; } pred.lock(); curr.lock(); try{ if(validate(pred, curr)){ if(curr.key == key){ // logical remove Node, use volatile to make sure visibility curr.marked = true; pred.next = curr.next; curr.next = null; return true; } return false; } }finally{ pred.unlock(); curr.unlock(); } } } @Override public boolean contains(T item) { NodeWithLockAndMark<T> curr = head; int key = item.hashCode(); while(curr.key < key){ curr = curr.next; } return curr.key == key && !curr.marked; } private boolean validate(NodeWithLockAndMark<T> pred, NodeWithLockAndMark<T> curr){ return !pred.marked && !curr.marked && pred.next == curr; } }
最后来看看真正无锁的Set的实现,在懒惰Set实现中已经把contains方法的锁去了,无锁实现须要把add和remove中的锁也去掉。无锁的Set须要保证的是假设一个节点被标记逻辑删除了,那么它的next字段就不能被使用了 ,也就是不能被改动了。否则的话可能出现一个节点被标记逻辑删除了。可是其它现场没看到,还在继续使用它的next字段,这样就出现了无效的add或remove。
AtomicMarkableReference能够保证这个场景的原子性。它维护了marked状态和next引用的一个二元状态。这个二元状态的改动是原子的。AtomicMarkableReference的很多其它内容请看这篇聊聊高并发(二十)解析java.util.concurrent各个组件(二) 12个原子变量相关类
所以我们对Node再次进行改进。把next字段改成了AtomicMarkableReference
这里隐含了三个理解无锁实现的关键点
1. next的标记状态表示的是当前节点是否被逻辑删除,通过CAS推断这个状态能够知道当前节点是否被逻辑删除了
2. 通过CAS比較next的Reference就能够知道当前节点和next节点的物理关系是否被改动了,也是就是能够知道下一个节点是否被物理删除了
3. 由于next标记的状态是当前节点的状态。所以当前节点是无法知道下一个节点是否被逻辑删除了的。这个点非常重要,由于无锁的加入可能会出现加入的节点的兴许节点是一个已经被逻辑删除,可是还没有物理删除的节点
static class NodeWithAtomicMark<T>{ T item; AtomicMarkableReference<NodeWithAtomicMark<T>> next = new AtomicMarkableReference<NodeWithAtomicMark<T>>(null, false); int key; public NodeWithAtomicMark(T item){ this.item = item; this.key = item.hashCode(); } public NodeWithAtomicMark(){} }
理解上面的3点是理解无锁实现的关键。前两点能保证节点知道自己是否被标记了,而且知道后继节点是否被删除了。
第三点的存在要求无锁实现必须有一个单独的物理删除节点的过程。
所以抽取出了一个单独的类来寻找要操作的现场,而且寻找过程中物理地删除节点。
Position类表示要操作的现场,它返回pred和curr节点的指针。
findPosition方法从头节点往后寻找位置,边寻找边物理删除被标记逻辑删除的节点。
最后返回的时候的Postion的curr.key >= 要操作节点的HashCode
class Position<T>{ NodeWithAtomicMark<T> pred, curr; public Position(NodeWithAtomicMark<T> pred, NodeWithAtomicMark<T> curr){ this.pred = pred; this.curr = curr; } public Position(){} public Position<T> findPosition(NodeWithAtomicMark<T> head, int key){ boolean[] markHolder = new boolean[1]; NodeWithAtomicMark<T> pred, curr, succ; boolean success; retry: while(true){ pred = head; curr = pred.next.getReference(); // 清除被逻辑删除的节点。也就是被标记的节点 while(true){ succ = curr.next.get(markHolder); if(markHolder[0]){ success = pred.next.compareAndSet(curr, succ, false, false); if(!success){ continue retry; } curr = succ; succ = curr.next.get(markHolder); } if(curr.key >= key){ return new Position<T>(pred, curr); } pred = curr; curr = succ; } } } }
来看看无锁Set的实现
1. 无锁加入的时候,先用Position来寻找现场。隐含了一点是寻找的时候同一时候物理删除了节点
2. 推断是否存在的时候也是推断的快照状态。仅仅保证弱一致性
3. 假设找到了要插入的现场。新建一个节点。并把next指向curr。这里须要理解curr可能被标记了逻辑删除,后面pred的next字段的CAS操作会保证curr没有被物理删除。可是如上面第三点说的,无法知道curr是否被逻辑删除了。所以新增加的节点的next可能指向了一个被逻辑删除的节点。可是不影响逻辑。由于下一个Postion.findPosition操作会正确地删除被标记逻辑删除的节点。
contains操作也会推断节点是否被逻辑删除
4. pred.next通过AtomicMarkableReference的CAS操作保证了curr节点没有被物理删除。假设CAS成功,说明加入成功了
5. 删除也一样,先寻找现场,同一时候物理删除被标记的节点
6. 找到现场后,先CAS改动当前节点的状态为被标记逻辑删除,
7. 尝试物理删除一次,假设这里物理删除成功了,那么假设正好有一个add在相同的现场操作。那么add的Pred的CAS操作会失败。假设这里物理删除失败,那么就把逻辑删除的节点保留在链表中,等下一次findPosition操作来真正的物理删除
8. contains操作从头节点開始遍历,推断节点内容,而且没有被标记逻辑删除
9. 全部的CAS操作都配合了轮询,这样保证终于CAS操作的成功
class LockFreeSet<T> implements Set<T>{ private final NodeWithAtomicMark<T> head; public LockFreeSet(){ head = new NodeWithAtomicMark<T>(); head.key = Integer.MIN_VALUE; NodeWithAtomicMark<T> MAX = new NodeWithAtomicMark<T>(); MAX.key = Integer.MAX_VALUE; head.next.set(MAX, false); } @Override public boolean add(T item) { NodeWithAtomicMark<T> pred, curr; int key = item.hashCode(); Position p = new Position(); while(true){ Position position = p.findPosition(head, key); pred = position.pred; curr = position.curr; // 假设已经存在,仅仅能保证弱一致性,这里仅仅是一个当时状态的快照。有可能相等的节点在这之后被其它线程已经被删除了。可是这里不能看到 if(curr.key == key){ return false; } NodeWithAtomicMark<T> node = new NodeWithAtomicMark<T>(item); node.next.set(curr, false); // 二元状态保证: // 1.pred是未被标记的; // 2.curr未被物理删除。还是pred的兴许节点,这时候即使curr已经被逻辑删除了。也不影响加入成功。curr会在下一次find被物理删除 // 二元状态无法知道curr是否被逻辑删除,由于mark表示的是自己节点的状态。
可是curr是否被逻辑删除不影响加入成功,仅仅要不被物理删除即可 if(pred.next.compareAndSet(curr, node, false, false)){ return true; } } } @Override public boolean remove(T item) { NodeWithAtomicMark<T> pred, curr; int key = item.hashCode(); Position p = new Position(); boolean success = false; while(true){ Position position = p.findPosition(head, key); pred = position.pred; curr = position.curr; // 假设已经存在,仅仅能保证弱一致性。这里仅仅是一个当时状态的快照。有可能相等的节点在这之后被其它线程已经被删除了,可是这里不能看到 if(curr.key == key){ NodeWithAtomicMark<T> succ = curr.next.getReference(); success = curr.next.compareAndSet(succ, succ, false, true); if(!success){ continue; } pred.next.compareAndSet(curr, succ, false, false); return true; }else{ return false; } } } @Override public boolean contains(T item) { NodeWithAtomicMark<T> curr = head; int key = item.hashCode(); boolean[] markHolder = new boolean[1]; while(curr.key < key){ curr = curr.next.getReference(); // 检查是否被删除 curr.next.get(markHolder); } return curr.key == key && !markHolder[0]; } }
以上是关于聊聊高并发(三十二)实现一个基于链表的无锁Set集合的主要内容,如果未能解决你的问题,请参考以下文章