多线程基础JUC工具包

Posted 烟锁迷城

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了多线程基础JUC工具包相关的知识,希望对你有一定的参考价值。

1、重入锁

在JUC工具包中,有Lock抽象方法,它同样实现了锁。
在示例中的代码,Lock lock = new ReentrantLock();声明锁的存在,lock.lock();开启锁,因为开启锁就一定要关闭锁,所以将解锁语句lock.unlock();放在finally中,加锁保护的对象放在这两段代码之间,这样加锁就完成了。

static Lock lock = new ReentrantLock();
public static int count = 0;

public static void incr() {
    try {
        Thread.sleep(1);
        lock.lock();
        count++;
    } catch (InterruptedException e) {
        e.printStackTrace();
    }finally {
        lock.unlock();
    }
}

public static void main(String[] args) throws InterruptedException {
    for (int i = 0; i < 1000; i++) {
        new Thread(() -> Count.incr()).start();
    }
    Thread.sleep(2000);
    System.out.println("结果:" + count);
}

Lock是一个接口,具有多种实现,包括ReentrantLock(重入锁),ReentrantReadWriteLock(读写锁,适合读多写少的环境),StampedLock(JDK1.8之后出现的读写锁改进版本)
重入锁是指一个持有锁的线程,在释放锁之前,如果再次访问加了该同步锁的其他方法,这个线程不需要再次争抢锁,只需要记录重入次数。
和synchronized不同,在同一把锁中,线程的再次抢占会导致死锁,而重入锁只会记录次数,不会造成死锁。

2、AQS

AbstractQueuedSynchronizer是同步工具的核心,主要实现线程的排队阻塞操作。
在ReentrantLock的lock方法中具有两个实现,非公平锁NonfairSync和公平锁FairSync,在其构造函数中,默认是非公平锁

public ReentrantLock() {
    sync = new NonfairSync();
}

在NonfairSync类中,关键的lock方法,第一行代码就是CAS方法。CAS方法之前有提到过,就是比较并修改,这次的偏移量是state,private volatile int state,锁的状态,属于AQS的一个属性,在这里作为互斥的共享资源来使用,0表示无锁,大于0表示有锁。
因此,CAS方法表示,如果这里state的预期值是0,代表无锁,就将state更新为1,表示有锁,成功更新返回true,否则返回false。第一个线程进入并修改状态后,后续的线程就无法修改成功,这样就保证了只有一个线程可以修改成功。
如果抢占成功,setExclusiveOwnerThread(Thread.currentThread());当前线程会被放入一个变量exclusiveOwnerThread中作为独占线程,exclusiveOwnerThread同样属于AQS。
没有抢占到锁的线程会执行acquire方法

static final class NonfairSync extends Sync {
    private static final long serialVersionUID = 7316153563782823691L;
    
    final void lock() {
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1);
    }

    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
}

protected final boolean compareAndSetState(int expect, int update) {
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

protected final void setExclusiveOwnerThread(Thread thread) {
    exclusiveOwnerThread = thread;
}

tryAcquire方法用来抢占锁,nonfairTryAcquire方法中,先获取当前线程状态,如果状态为0,证明当前无锁,抢占成功,和之前的代码一样。
若当前线程为独占线程,则表示重入,记录重入次数,更改线程状态。
acquire方法中,无论是重入锁还是抢占锁,都会返回true,没抢到锁返回false,之后取反,只有没抢占到锁才会继续执行acquireQueued(addWaiter(Node.EXCLUSIVE), arg)

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}

final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

addwaiter方法创建双向链表,会将抢占失败的线程放入双线链表中,Node就是链表节点,此双向链表的头结点为空。
acquireQueued方法,若传入节点的前一个节点为头结点,且抢占锁成功,就返回失败,终止逻辑。
反之,则让线程阻塞。

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

想要唤醒阻塞线程,就需要unlock()方法。
tryRelease方法中,将当前线程状态减去1,即减少重入次数,如果此时状态为0,变为无锁,则去除独占线程,返回解锁成功。无论是否无锁,都将更新线程状态,返回是否解锁成功。
如果解锁成功,取出头结点,如果头结点不为空且仍在等待,就执行unparkSuccessor,返回解锁成功,反之返回失败。
unparkSuccessor方法,唤醒头结点的下一个节点。
唤醒之后,阻塞方法将返回一个中断标志,因为被唤醒的线程可以重新加入抢占,并且可以抢占到锁,然后去除双向链表头结点,将下一个的线程变为头节点。

public void unlock() {
    sync.release(1);
}
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}
protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}

3、CountDownLatch

countdownlatch是一个同步工具类,它允许一个或多个线程一直等待,直到其他线程的操作执行完毕再执行。

3.1、基础应用

在示例代码中,CountDownLatch countDownLatch = new CountDownLatch(3);指的是倒计时3,countDownLatch.countDown();意为倒计时启动,会将倒计时的设置数减去1,等到倒计时数等于0时,线程将会启动。

public class CountDownLatchDemo {

    public static void main(String[] args) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(3);
        new Thread(()->{
            countDownLatch.countDown();
        }).start();
        new Thread(()->{
            countDownLatch.countDown();
        }).start();
        new Thread(()->{
            countDownLatch.countDown();
        }).start();
        countDownLatch.await();
        System.out.println("end");
    }
}

3.2、应用场景

可以当循环的100个线程进入等待,直到主线程计数减1,等待的100个线程将同时启动

public class CountDownLatchDemo {

    public static void main(String[] args) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        IntStream.range(1,100).forEach(i->{
            new Thread(()->{
                try {
                    countDownLatch.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("end");
            }).start();
        });
        Thread.sleep(5000);
        countDownLatch.countDown();
    }
}

3.3、原理分析

在CountDownLatch的await方法中,如果线程中断就抛出中断异常,未中断就判断倒计时数是否为0,若为0就返回1,反之返回-1。
若小于0,即倒计时还未结束,执行doAcquireSharedInterruptibly方法。

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}

addWaiter方法在AQS中见到过,它会将线程组成一个双向链表,同时循环判断节点的前一个节点是否为头结点,若为头结点,则判断倒计时数是否为0,若为0就返回1,反之返回-1。若大于等于0,即倒计时已经结束,等待结束。

private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

在CountDownLatch的countDown方法中,获取当前倒计时数,如果不为0,就减去1,使用CAS更新当前倒计时数,若此时倒计时数为0,返回true,执行doReleaseShared方法。

public void countDown() {
    sync.releaseShared(1);
}

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

protected boolean tryReleaseShared(int releases) {
    for (;;) {
        int c = getState();
        if (c == 0)
            return false;
        int nextc = c-1;
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}

doReleaseShared方法,将双向链表中的全部线程依次唤醒。逻辑当unparkSuccessor方法唤醒头结点的下一个节点,也就是第一个线程之后,CAS将改变线程的状态,随后根据AQS原理,头结点将被删除,第二个节点将变为头结点。如果h节点为头结点,则代表整个链表都已经被唤醒,完成循环唤醒的操作,这就是共享锁的依次唤醒,上一个线程唤醒下一个线程。

private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h);
            } else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}

private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}

以上是关于多线程基础JUC工具包的主要内容,如果未能解决你的问题,请参考以下文章

java多线程进阶JUC工具集

java多线程进阶JUC工具集

多线程进阶=;JUC编程

JUC高级多线程_01:基础知识回顾

Java---JUC并发篇(多线程详细版)

Java---JUC并发篇(多线程详细版)