java多线程进阶线程通信

Posted 烟锁迷城

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了java多线程进阶线程通信相关的知识,希望对你有一定的参考价值。

1、wait与notify

1.1、基本原理

wait和notify分别起到等待和唤醒的作用,

wait:让一个获取到锁的线程(即获取到监视器对象的线程)释放锁,并进入到等待状态。

notify:让一个进入到等待状态的线程被唤醒,开始抢占锁。

在上述流程图中,可以看出整个流程。

在加锁的情况下,需要抢占资源,如果抢占到,就会成功获取到Monitor,如果失败,线程则会进入到同步队列之中开始等待

wait线路,wait会释放掉锁,所以在此之前必需要获取到锁,如果没有获取到锁就执行wait,则会直接抛出错误,所以线程获取到锁之后,执行wait方法,进入到等待队列,锁被释放。

notify线路,notify需要获取到锁,以此为依据知道需要去唤醒哪个等待队列的线程,在执行完加锁代码块之后,锁会被释放,然后唤醒一个线程,理论上notify会随机唤醒一个,notifyAll会唤醒全部线程。实际上,notify和notifyAll的唤醒顺序是基于JVM的底层实现的,所以在JAVA1.8版本中,notify会顺序唤醒,notifyAll会根据先进后出的原则依次唤醒全部。唤醒的线程会加入到同步队列之中,等待抢占资源。

所以我们可以看到,wait和notify的实现都是基于synchronized关键字的,这个关键字起到的作用有两个

  1. 保证在多线程的环境下有互斥的执行条件
  2. 让线程间的通讯有依据,确定是在同一个队列里。

1.2、实现方法

wait线程等待命令,notify线程通知命令,这两个命令本质是线程通信,是两个线程之间进行信息交互的命令,其中最为经典的就是生产者消费者模式。
生产者生产消息,在消息队列中进行添加,如果消息满了,就会执行wait方法,释放锁,让消费者和生产者去抢占,由于是非公平锁,消费者和生产者都可能抢占到,生产者抢到后重复执行上述过程,直到消费者抢到锁,开始消费。

public class Product implements Runnable 

    private Queue<String> msg;
    private int maxSize;

    public Product(Queue<String> msg, int maxSize) 
        this.msg = msg;
        this.maxSize = maxSize;
    

    @Override
    public void run() 
        int i = 0;
        while (true) 
            //加锁以确保对同一个对象只能由生产者消费者中的一个操作
            synchronized (msg) 
                i++;
                while (maxSize == msg.size()) 
                    //如果生产满了
                    try 
                        //wait将释放锁
                        msg.wait();
                     catch (InterruptedException e) 
                        e.printStackTrace();
                    
                
                try 
                    Thread.sleep(1000);
                 catch (InterruptedException e) 
                    e.printStackTrace();
                
                System.out.println("生产者生产消息:"+"生产消息" + i);
                msg.add("生产消息" + i);
                //唤醒处于阻塞状态下的线程
                msg.notify();
            
        
    

消费者消费消息,原理同生产者

public class Consumer implements Runnable 
    private Queue<String> msg;
    private int maxSize;

    public Consumer(Queue<String> msg, int maxSize) 
        this.msg = msg;
        this.maxSize = maxSize;
    

    @Override
    public void run() 
        while (true) 
            synchronized (msg)
                while (msg.isEmpty())
                    try 
                        msg.wait();
                     catch (InterruptedException e) 
                        e.printStackTrace();
                    
                
                try 
                    Thread.sleep(1000);
                 catch (InterruptedException e) 
                    e.printStackTrace();
                
                System.out.println("消费者消费消息:"+msg.remove());
                msg.notify();
            
        
    

执行程序,查看最终结果

public class Test 

    public static void main(String[] args) 
        Queue<String> queue = new LinkedList<>();
        int maxsize = 3;
        Product product = new Product(queue, maxsize);
        Consumer consumer = new Consumer(queue, maxsize);
        new Thread(product).start();
        new Thread(consumer).start();
    

2、join

join的作用是将主线程等待,执行子线程,子线程结束或到达等待时间后,主线程被唤醒,继续执行,这是一个基于wait和notify的典型应用,其源码内部就有wait的使用。

其notify是在JVM层面实现的,在JAVA代码中没有。

public final synchronized void join(long millis)
throws InterruptedException 
    long base = System.currentTimeMillis();
    long now = 0;

    if (millis < 0) 
        throw new IllegalArgumentException("timeout value is negative");
    

    if (millis == 0) 
        while (isAlive()) 
            wait(0);
        
     else 
        while (isAlive()) 
            long delay = millis - now;
            if (delay <= 0) 
                break;
            
            wait(delay);
            now = System.currentTimeMillis() - base;
        
    

3、Condition的线程通信

3.1、await与signal

await和signal的作用同样是等待和唤醒,那么我们不用await和signal只用wait与notify可以吗?显然是不可以的,因为wait与notify是和synchronized配合使用的,await和signal是与JUC的Lock配合使用的。

下图为整体流程图。

3.1.1、await源码

接下来就是喜闻乐见的源码阅读时刻了!

await的源码并不复杂,它的本质依旧是借助AQS原理来实现如同wait一样的队列。

Node node = addConditionWaiter();——将当前线程添加至AQS队列的最后。

int savedState = fullyRelease(node);——解放所有的锁,唤醒队列头结点,因为是重入计数的锁,所以要释放就要释放全部,同时记录下重入次数,保证加锁时回复原状。

int interruptMode = 0;——中断标志位

while (!isOnSyncQueue(node)) ————判断当前节点是否还在AQS队列里,但是之前解锁已经将其从队列里释放出去了。

LockSupport.park(this);——阻塞当前线程,当其他线程调用signal时,会继续从此开始执行(执行上下文切换,使用程序计数器和寄存器,涉及用户态到内核态的转换。)

if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)——是否响应,此处为被中断标志响应唤醒

break;——如果是中断标志导致的唤醒,跳出循环。

if (acquireQueued(node, savedState) && interruptMode != THROW_IE)——新的判断,重新竞争锁,将重入次数也加入了,加锁会恢复重入次数。

interruptMode = REINTERRUPT;

if (node.nextWaiter != null) // clean up if cancelled,官方注释,清除掉所有的失效节点。

unlinkCancelledWaiters();

if (interruptMode != 0)——中断标志不为0

reportInterruptAfterWait(interruptMode);——中断唤醒。

可以看到,整个流程可以分为添加节点至等待节点,解除掉所有的锁,唤醒AQS头结点的线程,然后让线程park,进入阻塞。

阻塞结束后,继续把锁加回来,重入计数恢复成未加锁的状态。

public final void await() throws InterruptedException 
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) 
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);

addConditionWaiter方法。

Node t = lastWaiter;——获取到最后一个节点

if (t != null && t.waitStatus != Node.CONDITION)——这句代码是有官方注释的,If lastWaiter is cancelled, clean out,如果节点是取消状态,就清除掉,可以了解到,接下来的语句一定是用来清除无效节点的。

unlinkCancelledWaiters();——判断内部,清除无效节点

t = lastWaiter;——重新赋予最后一个等待节点

Node node = new Node(Thread.currentThread(), Node.CONDITION);——以当前线程创建等待节点

if (t == null) firstWaiter = node; ——如果尾结点为空,证明链表没有节点,此节点为头结点

else t.nextWaiter = node;——否则,此节点为尾结点的下一节点

lastWaiter = node;——移交尾结点

return node;——返回节点

此方法作用是获取最末尾有效节点,然后将当前线程作为等待节点插入至最后,取代最末节点,依旧是AQS的添加思路。

private Node addConditionWaiter() 
    Node t = lastWaiter;
    // If lastWaiter is cancelled, clean out.
    if (t != null && t.waitStatus != Node.CONDITION) 
        unlinkCancelledWaiters();
        t = lastWaiter;
    
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;

unlinkCancelledWaiters方法

Node t = firstWaiter;——获取头结点

Node trail = null;——设置trail 节点

while (t != null)——头结点不为空时,始终循环清除

Node next = t.nextWaiter;——获取头结点的下一个节点,赋予next

if (t.waitStatus != Node.CONDITION)——如果不是condition的节点

t.nextWaiter = null;——赋予下一个节点空

if (trail == null) firstWaiter = next;——如果trail 节点为空,则头结点为下一个节点

else trail.nextWaiter = next;——反之,则将trail 节点的下一节点改为next

if (next == null) lastWaiter = trail;——如果next为空,则最后一个节点就是trail 

else trail = t;——如果等于,trail 就是t,trail 就是头结点

t = next;——循环必经之路,t为next

也就是说,整个循环只有t为null才会结束,t会被赋予next的数值,也就是next为null,循环结束,反推之,next为null时,lastWaiter为trail,next还是t.nextWaiter,也就是说,这是一个循环查找下一个符合condition的节点的方法,如果它的下一节点为null,它自己就是最后节点。

private void unlinkCancelledWaiters() 
    Node t = firstWaiter;
    Node trail = null;
    while (t != null) 
        Node next = t.nextWaiter;
        if (t.waitStatus != Node.CONDITION) 
            t.nextWaiter = null;
            if (trail == null)
                firstWaiter = next;
            else
                trail.nextWaiter = next;
            if (next == null)
                lastWaiter = trail;
        
        else
            trail = t;
        t = next;
    

fullyRelease方法,此方法会释放所有的锁,此处所有指的是所有的计数,无论重入几次,同时唤醒AQS队列的头结点。

int savedState = getState()——获取重入次数

if (release(savedState))——其内做了简单的判断,是否为当前线程的锁,锁的重入次数是否相当,如果一致,则释放成功,将冲入次数改为0,线程从ExclusiveOwnerThread去除

failed = false;——失败标志置为false

return savedState;——返回重入次数

unparkSuccessor(h),这个方法很眼熟,因为这个方法上文中有提到过,是包含unpark方法的,用来唤醒被阻塞的线程,也就是说,此时AQS队列的头结点将会被唤醒

final int fullyRelease(Node node) 
    boolean failed = true;
    try 
        int savedState = getState();
        if (release(savedState)) 
            failed = false;
            return savedState;
         else 
            throw new IllegalMonitorStateException();
        
     finally 
        if (failed)
            node.waitStatus = Node.CANCELLED;
    

public final boolean release(int arg) 
    if (tryRelease(arg)) 
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    
    return false;

protected final boolean tryRelease(int releases) 
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) 
        free = true;
        setExclusiveOwnerThread(null);
    
    setState(c);
    return free;

3.1.2、signal

把被阻塞的线程唤醒,等待队列中被唤醒的线程转移到AQS队列中。

if (!isHeldExclusively())——判断当前线程是否抢占到锁

Node first = firstWaiter;——获得第一个等待节点

if (first != null)——如果当前节点不为null

doSignal(first);——执行signal

public final void signal() 
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);

protected final boolean isHeldExclusively() 
    // While we must in general read state before owner,
    // we don't need to do so to check if current thread is owner
    return getExclusiveOwnerThread() == Thread.currentThread();

 这里是一个do while循环,先看while内部

!transferForSignal(first) &&(first = firstWaiter) != null——transferForSignal方法依靠AQS和CAS来将传入的节点状态由CONDITION替换为0,即从等待节点换为无状态节点,然后用enq方法构建AQS队列,将节点状态换为SIGNAL,然后唤醒线程。

循环内部,如果第一等待节点被赋予first参数节点的下一等待节点,且此节点为null,则将上一等待节点置为null,并且无论结果,first参数节点的下一等待节点都将被置为null。

private void doSignal(Node first) 
    do 
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
     while (!transferForSignal(first) &&
             (first = firstWaiter) != null);

final boolean transferForSignal(Node node) 
    /*
     * If cannot change waitStatus, the node has been cancelled.
     */
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;

    /*
     * Splice onto queue and try to set waitStatus of predecessor to
     * indicate that thread is (probably) waiting. If cancelled or
     * attempt to set waitStatus fails, wake up to resync (in which
     * case the waitStatus can be transiently and harmlessly wrong).
     */
    Node p = enq(node);
    int ws = p.waitStatus;
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;

3.2、实现方法

Condition的使用方法类似于wait于notify,都需要等待和唤醒。

这是等待部分

public class ConditionDemoWait implements Runnable

    private Lock lock;
    private Condition condition;

    public ConditionDemoWait(Lock lock, Condition condition) 
        this.lock = lock;
        this.condition = condition;
    

    @Override
    public void run() 
        System.out.println("ConditionDemoWait - start");
        lock.lock();
        try
            condition.await();
            System.out.println("ConditionDemoWait - end");
        catch (Exception e)
            e.printStackTrace();
        finally 
            lock.unlock();
        
    

这是唤醒部分

public class ConditionDemoSignal implements Runnable

    private Lock lock;
    private Condition condition;

    public ConditionDemoSignal(Lock lock, Condition condition) 
        this.lock = lock;
        this.condition = condition;
    

    @Override
    public void run() 
        System.out.println("ConditionDemoSignal - start");
        lock.lock();
        try
            condition.signal();
            System.out.println("ConditionDemoSignal - end");
        catch (Exception e)
            e.printStackTrace();
        finally 
            lock.unlock();
        
    

这是执行部分

Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
ConditionDemoWait conditionDemoWait = new ConditionDemoWait(lock, condition);
ConditionDemoSignal conditionDemoSignal = new ConditionDemoSignal(lock, condition);
new Thread(conditionDemoWait).start();
new Thread(conditionDemoSignal).start();

3.3、实际应用

Condition可以实现阻塞队列

  1. 线程池中的阻塞队列
  2. 生产者消费者
  3. 流量缓冲

4、阻塞队列

队列是一种线程表,可以一端插入,一端删除,是先进先出的基础结构。

阻塞队列,需要支持两种情况

  1. 在队列满溢的时候,添加线程需要被阻塞,删除线程需要被唤醒
  2. 在队列为空的时候,添加线程需要被唤醒,删除线程需要被阻塞

由此可见,其实这就是一个生产者消费者模型。

在JUC之中,有很多的阻塞队列实现,以下是一些通用方法

添加方法

  • add:添加元素,如果队列满了,则抛出异常
  • offer:添加元素,返回true/false,添加成功,返回true,否则返回false
  • put:添加元素,如果队列满了,则一直阻塞线程
  • offer(timeout):添加元素,附带超时时间,如果队列满了,则阻塞线程timeout的时长,如果超时还没有,则返回false。

移除方法

  • element:移除元素,如果队列为空,抛出异常
  • peek:移除元素,返回true/false,移除成功,返回true,否则返回false
  • take:移除元素,如果队列为空,则一直阻塞线程
  • poll(timeout):移除元素,附带超时时间,如果队列为空,则阻塞线程timeout的时长,如果超时还没有,则返回null。

JUC实现队列举例:

  • ArrayBlockQueue:基于数组实现的队列
  • LinkedBlockQueue:基于链表实现的队列
  • PriorityBlockingQueue:具有优先级的队列
  • DelayQueue:允许延时执行的队列
  • SynchronousQueue:没有存储结构的队列(用在线程池newCachedThreadPool中)

开发者涨薪指南 48位大咖的思考法则、工作方式、逻辑体系

以上是关于java多线程进阶线程通信的主要内容,如果未能解决你的问题,请参考以下文章

Java程序设计多线程进阶

Java程序设计多线程进阶

Java-进阶:多线程2

大数据进阶26-Lock死锁线程间通信线程组线程池,以及定时器

一份针对于新手的多线程实践--进阶篇

『Java练习生的自我修养』java-se进阶³ • 线程的等待与唤醒