Java并发编程的艺术(5-10)学习总结

Posted 月亮的-影子

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java并发编程的艺术(5-10)学习总结相关的知识,希望对你有一定的参考价值。

本文参考学习Java并发编程的艺术

第5章 Java中的锁

5.1 Lock接口

  • synchronized没有的特性
    • 尝试非阻塞获取锁
    • 能够中断获取锁
    • 超时获取锁

5.2 队列同步器

  • 队列同步器AbstractQueuedSynchronizer用来构建锁,或者其它同步组件。用一个int成员变量表示同步状态。通过内置的FIFO队列完成资源获取线程的排队工作。
  • 同步器的实现主要是继承,同步器需要提供(getState()、setState(int newState)和compareAndSetState(int expect,int update))方法来获取同步的状态。
  • 同步器支持独占或者是共享地获取锁。

5.2.1 队列同步器的接口与示例

  • 同步器的实现基于模板方法。继承并重写。

  • 模板方法包括3类,独占式的获取和释放同步状态,共享式的获取和释放同步状态,查询同步队列的等待状态线程情况。

通过独占锁来说明情况

  • 独占锁只能一个线程获取锁。其它线程只能进入到同步队列。
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;

class Mutex implements Lock 
  // 静态内部类,自定义同步器
  private static class Sync extends AbstractQueuedSynchronizer 
    // 是否处于占用状态
    protected boolean isHeldExclusively() 
      return getState() == 1;
    
    // 当状态为0的时候获取锁
    public boolean tryAcquire(int acquires) 
      if (compareAndSetState(0, 1)) 
        setExclusiveOwnerThread(Thread.currentThread());
        return true;
      
      return false;
    
    // 释放锁,将状态设置为0
    protected boolean tryRelease(int releases) 
      if (getState() == 0) throw new
              IllegalMonitorStateException();
      setExclusiveOwnerThread(null);
      setState(0);
      return true;
    
    // 返回一个Condition,每个condition都包含了一个condition队列
    Condition newCondition()  return new ConditionObject(); 
  
  // 仅需要将操作代理到Sync上即可
  private final Sync sync = new Sync();
  public void lock()  sync.acquire(1); 
  public boolean tryLock()  return sync.tryAcquire(1); 
  public void unlock()  sync.release(1); 
  public Condition newCondition()  return sync.newCondition(); 
  public boolean isLocked()  return sync.isHeldExclusively(); 
  public boolean hasQueuedThreads()  return sync.hasQueuedThreads(); 
  public void lockInterruptibly() throws InterruptedException 
    sync.acquireInterruptibly(1);
  
  public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException 
    return sync.tryAcquireNanos(1, unit.toNanos(timeout));
  

  • 上面的Mutex只有在tryAcquire的CAS设置成功才能够说明获取了同步状态。
  • tryRelease把同步状态设置为0。
  • 获取状态失败就会进入到阻塞队列。

5.2.2 队列同步器的实现分析

1.同步队列

  • 同步器依赖内部的同步队列完成同步状态管理。线程获取同步状态失败,同步器就会把当前的线程以及等待状态信息构成节点Node存入到同步队列。

  • 节点是构成队列的基础,有首尾节点。如果线程没有获取同步状态成功就会进入到队列的尾部
  • 加入到尾部的时候一定要是一个线程安全的状态,所以有方法compareAndSetTail(Node expect,Node update)。
  • 每次唤醒都是先从头部开始。

2.独占式同步状态获取与释放

  • 同步器acquire(int arg)可以去获取同步状态。对中断不敏感。也就是线程不会从同步队列中移出去。
  • 首先是调用tryAcquire(int arg)保证线程安全获取同步状态。
  • 如果失败构造同步节点Node.EXCLUSIVE,并且通过addWaiter(Node node)加入到同步队列的尾部。
  • 再通过acquireQueued(Node node,int arg)进入死循环获取同步状态。
  • 只有前驱的节点头才能够获取同步状态
    • 头结点获取同步状态的节点,释放之后会唤醒下一个节点
    • 维护FIFO原则。
public final void acquire(int arg) 
 if (!tryAcquire(arg) &&
 acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
 selfInterrupt();


  • compareAndSetTail(Node expect,Node update)保证了节点线程安全加入。enq通过死循环保证节点被正确添加。
 private Node addWaiter(Node mode) 
        Node node = new Node(Thread.currentThread(), mode);
// 快速尝试在尾部添加
        Node pred = tail;
        if (pred != null) 
            node.prev = pred;
            if (compareAndSetTail(pred, node)) 
                pred.next = node;
                return node;
            
        
        enq(node);
        return node;
    
    private Node enq(final Node node) 
        for (;;) 
            Node t = tail;
            if (t == null)  // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
             else 
                node.prev = t;
                if (compareAndSetTail(t, node)) 
                    t.next = node;
                    return t;
                
            
        
    
  • acquireQueued(final Node node,int arg)死循环获取同步状态。
final boolean acquireQueued(final Node node, int arg) 
        boolean failed = true;
        try 
            boolean interrupted = false;
            for (;;) 
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) 
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                
                if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                    interrupted = true;
            
         finally 
            if (failed)
                cancelAcquire(node);
        
    

  • 接着就是release,唤醒头结点后面的一个节点

3.共享式同步状态获取与释放

  • 共享锁可以多线程获取同步状态。

  • acquireShared(int arg)共享式获取同步状态。
  • 同步器调用tryAcquireShared(int arg)来获取同步状态,返回值大于等于0说明获取成功。
 public final void acquireShared(int arg) 
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    
    private void doAcquireShared(int arg) 
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try 
            boolean interrupted = false;
            for (;;) 
                final Node p = node.predecessor();
                if (p == head) 
                    int r = tryAcquireShared(arg);
                    if (r >= 0) 
                        setHeadAndPropagate(node, r);
                        p.next = null;
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    
                
                if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                    interrupted = true;
            
         finally 
            if (failed)
                cancelAcquire(node);
        
    

4.独占式超时获取同步状态

  • 可以通过调用同步器的doAcquireNanos(int arg,long nanosTimeout)可以超时获取同步状态。
  • 如果是调用了acquireInterruptibly(int arg),那么只要线程被中断就会报InterruptedException。
  • 但是doAcquireNanos(int arg,long nanosTimeout)能够中断,而且可以计算出需要睡眠的时间。nanosTimeout-=now-lastTime如果是大于0说明还没有超时。否则就是超时了。
  • 如果 nanosTimeout小于等于spinForTimeoutThreshold(1000纳秒)的时候,线程就不会进入到超时等待了。而是进入到快速自旋。直到超时。
 private boolean doAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException 
        long lastTime = System.nanoTime();
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try 
            for (;;) 
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) 
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return true;
                
                if (nanosTimeout <= 0)
                    return false;
                if (shouldParkAfterFailedAcquire(p, node)
                        && nanosTimeout > spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanosTimeout);
                long now = System.nanoTime();
//计算时间,当前时间now减去睡眠之前的时间lastTime得到已经睡眠
//的时间delta,然后被原有超时时间nanosTimeout减去,得到了
//还应该睡眠的时间
                nanosTimeout -= now - lastTime;
                lastTime = now;
                if (Thread.interrupted())
                    throw new InterruptedException();
            
         finally 
            if (failed)
                cancelAcquire(node);
        
    

5.3 重入锁

  • 支持一个线程多次获取锁。
  • 公平锁效率未必比非公平的高。

1.实现重进入

  • 线程再次获取锁,需要锁去识别当前获取锁的线程是不是和锁的持有线程一样。
  • 锁的释放,要求的就是计数重复获取锁的数量减低为0。
  • 下面的方法就增加了线程的判断。增加了同步状态的值。
    final boolean nonfairTryAcquire(int acquires) 
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) 
            if (compareAndSetState(0, acquires)) 
                setExclusiveOwnerThread(current);
                return true;
            
         else if (current == getExclusiveOwnerThread()) 
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        
        return false;
    
  • 同样要求在释放的时候,减去状态的值。
protected final boolean tryRelease(int releases) 
        int c = getState() - releases;
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        boolean free = false;
        if (c == 0) 
            free = true;
            setExclusiveOwnerThread(null);
        
        setState(c);
        return free;
    

2.公平与非公平获取锁的区别

  • 锁是公平那么一定符合FIFO请求的绝对时间顺序。
  • 对于非公平锁来说只要CAS成功,那么就算是同步状态成功。
  • 对于公平锁,每次获取锁的时候还需要判断队列是不是有线程等待,才能够获取。
    protected final boolean tryAcquire(int acquires) 
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) 
            if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) 
                setExclusiveOwnerThread(current);
                return true;
            
         else if (current == getExclusiveOwnerThread()) 
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        
        return false;
    
  • 非公平锁只要CAS状态成功就算是获取锁,所以可能会导致一个线程连续获取锁。
  • 而且公平锁每次获取锁的线程不同每次都要切换,但是非公平锁可以连续一个线程获取锁,减少切换的成本。

5.4 读写锁

  • 读写锁允许同一个时刻多个读线程访问。
  • 读写锁维护了一对锁。
  • ReentrantReadWriteLock的特性
    • 公平性选择:支持公平和非公平获取
    • 可重入
    • 锁降级

5.4.1 读写锁的接口与示例

5.4.2 读写锁的实现分析

1.读写状态的设计

  • 同样是依靠同步器实现同步的功能。
  • 维护读写锁的同步状态有多个状态。所以通过按位切割使用。高16位是读,低16位是写。
  • 当前的同步状态是读锁被同一个线程获取了写锁,重入了两次,而且还获取了两次读锁。

2.写锁的获取与释放

  • 写锁是支持可重入的排它锁。

  • 如果当前线程获取了写锁,那么就增加写状态,如果当前线程在获取写锁时,读锁已经被获取或者该线程不是已经获取写锁的线程,那么线程进入到等待状态。

  • 这里除了判断可重入,还判断是否存在读锁。如果存在读锁,那么写锁就不能被获取。

  • 因为读写锁需要保证写锁的操作对读锁是可见的。因为读锁被获取的状况,去获取写锁,那么当前运行的线程是没有办法感知当前写线程的操作。

 protected final boolean tryAcquire(int acquires) 
        Thread current = Thread.currentThread();
        int c = getState();
        int w = exclusiveCount(c);
        if (c != 0) 
// 存在读锁或者当前获取线程不是已经获取写锁的线程
            if (w == 0 || current != getExclusiveOwnerThread())
                return false;
            if (w + exclusiveCount(acquires) > MAX_COUNT)
                throw new Error("Maximum lock count exceeded");
            setState(c + acquires);
            return true;
        
        if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) 
            return false;
        
        setExclusiveOwnerThread(current);
        return true;
    

3.读锁的获取与释放

  • 支持可重入的共享锁。
  • 能被多个线程获取,在没有别的写线程访问的情况下,读锁会被成功获取。
  • 如果当前线程已经获取了读锁,那么就增加读状态。
  • 如果当前线程获取读锁的时候,发现写锁被获取,那么就会进入到等待状态。
protected final int tryAcquireShared(int unused) 
        for (;;) 
            int c = getState();
            int nextc = c + (1 << 16);
            if (nextc < c)
                throw new Error("Maximum lock count exceeded");
            if (exclusiveCount(c) != 0 && owner != Thread.currentThread())
                return -1;
            if (compareAndSetState(c, nextc))
                return 1;
        
    

4.锁降级

  • 锁降级指的是写锁降级成为读锁。意思是拿到写锁之后再获取读锁
  • 锁降级的获取读锁是否有必要?如果不获取读锁,直接释放写锁的问题就是另一个线程获取写锁并且修改数据,那么当前线程无法感知线程T的数据更新。

5.5 LockSupport工具

  • park(Object blocker)、parkNanos(Object blocker,long nanos) 和parkUntil(Object blocker,long deadline)阻塞当前线程,blocker是标识线程等待的对象。

5.6 Condition接口

5.6.2 Condition的实现分析

1.等待队列

  • 是一个FIFO队列。如果线程调用await就会进入Condition的等待队列。
  • 由于await一定是在获取锁的情况执行,所以不需要CAS保证线程安全性。

2.等待

  • 释放锁,并且线程进入到等待队列。
    public final void await() throws InterruptedException 
        if (Thread.interrupted())
            throw new InterruptedException();
// 当前线程加入等待队列
        Node node = addConditionWaiter();
// 释放同步状态,也就是释放锁
        以上是关于Java并发编程的艺术(5-10)学习总结的主要内容,如果未能解决你的问题,请参考以下文章

分布式架构系列攻无不克并发编程的艺术学习思路总结

读《Java并发编程的艺术》

JAVA并发编程艺术 一(并发编程的挑战)

《java并发编程的艺术》学习小结

《java并发编程的艺术》学习小结

Java并发编程的艺术--一书知秋