java多线程进阶线程通信
Posted 烟锁迷城
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了java多线程进阶线程通信相关的知识,希望对你有一定的参考价值。
目录
1、wait与notify
1.1、基本原理
wait和notify分别起到等待和唤醒的作用,
wait:让一个获取到锁的线程(即获取到监视器对象的线程)释放锁,并进入到等待状态。
notify:让一个进入到等待状态的线程被唤醒,开始抢占锁。
在上述流程图中,可以看出整个流程。
在加锁的情况下,需要抢占资源,如果抢占到,就会成功获取到Monitor,如果失败,线程则会进入到同步队列之中开始等待
wait线路,wait会释放掉锁,所以在此之前必需要获取到锁,如果没有获取到锁就执行wait,则会直接抛出错误,所以线程获取到锁之后,执行wait方法,进入到等待队列,锁被释放。
notify线路,notify需要获取到锁,以此为依据知道需要去唤醒哪个等待队列的线程,在执行完加锁代码块之后,锁会被释放,然后唤醒一个线程,理论上notify会随机唤醒一个,notifyAll会唤醒全部线程。实际上,notify和notifyAll的唤醒顺序是基于JVM的底层实现的,所以在JAVA1.8版本中,notify会顺序唤醒,notifyAll会根据先进后出的原则依次唤醒全部。唤醒的线程会加入到同步队列之中,等待抢占资源。
所以我们可以看到,wait和notify的实现都是基于synchronized关键字的,这个关键字起到的作用有两个
- 保证在多线程的环境下有互斥的执行条件
- 让线程间的通讯有依据,确定是在同一个队列里。
1.2、实现方法
wait线程等待命令,notify线程通知命令,这两个命令本质是线程通信,是两个线程之间进行信息交互的命令,其中最为经典的就是生产者消费者模式。
生产者生产消息,在消息队列中进行添加,如果消息满了,就会执行wait方法,释放锁,让消费者和生产者去抢占,由于是非公平锁,消费者和生产者都可能抢占到,生产者抢到后重复执行上述过程,直到消费者抢到锁,开始消费。
public class Product implements Runnable
private Queue<String> msg;
private int maxSize;
public Product(Queue<String> msg, int maxSize)
this.msg = msg;
this.maxSize = maxSize;
@Override
public void run()
int i = 0;
while (true)
//加锁以确保对同一个对象只能由生产者消费者中的一个操作
synchronized (msg)
i++;
while (maxSize == msg.size())
//如果生产满了
try
//wait将释放锁
msg.wait();
catch (InterruptedException e)
e.printStackTrace();
try
Thread.sleep(1000);
catch (InterruptedException e)
e.printStackTrace();
System.out.println("生产者生产消息:"+"生产消息" + i);
msg.add("生产消息" + i);
//唤醒处于阻塞状态下的线程
msg.notify();
消费者消费消息,原理同生产者
public class Consumer implements Runnable
private Queue<String> msg;
private int maxSize;
public Consumer(Queue<String> msg, int maxSize)
this.msg = msg;
this.maxSize = maxSize;
@Override
public void run()
while (true)
synchronized (msg)
while (msg.isEmpty())
try
msg.wait();
catch (InterruptedException e)
e.printStackTrace();
try
Thread.sleep(1000);
catch (InterruptedException e)
e.printStackTrace();
System.out.println("消费者消费消息:"+msg.remove());
msg.notify();
执行程序,查看最终结果
public class Test
public static void main(String[] args)
Queue<String> queue = new LinkedList<>();
int maxsize = 3;
Product product = new Product(queue, maxsize);
Consumer consumer = new Consumer(queue, maxsize);
new Thread(product).start();
new Thread(consumer).start();
2、join
join的作用是将主线程等待,执行子线程,子线程结束或到达等待时间后,主线程被唤醒,继续执行,这是一个基于wait和notify的典型应用,其源码内部就有wait的使用。
其notify是在JVM层面实现的,在JAVA代码中没有。
public final synchronized void join(long millis)
throws InterruptedException
long base = System.currentTimeMillis();
long now = 0;
if (millis < 0)
throw new IllegalArgumentException("timeout value is negative");
if (millis == 0)
while (isAlive())
wait(0);
else
while (isAlive())
long delay = millis - now;
if (delay <= 0)
break;
wait(delay);
now = System.currentTimeMillis() - base;
3、Condition的线程通信
3.1、await与signal
await和signal的作用同样是等待和唤醒,那么我们不用await和signal只用wait与notify可以吗?显然是不可以的,因为wait与notify是和synchronized配合使用的,await和signal是与JUC的Lock配合使用的。
下图为整体流程图。
3.1.1、await源码
接下来就是喜闻乐见的源码阅读时刻了!
await的源码并不复杂,它的本质依旧是借助AQS原理来实现如同wait一样的队列。
Node node = addConditionWaiter();——将当前线程添加至AQS队列的最后。
int savedState = fullyRelease(node);——解放所有的锁,唤醒队列头结点,因为是重入计数的锁,所以要释放就要释放全部,同时记录下重入次数,保证加锁时回复原状。
int interruptMode = 0;——中断标志位
while (!isOnSyncQueue(node)) ————判断当前节点是否还在AQS队列里,但是之前解锁已经将其从队列里释放出去了。
LockSupport.park(this);——阻塞当前线程,当其他线程调用signal时,会继续从此开始执行(执行上下文切换,使用程序计数器和寄存器,涉及用户态到内核态的转换。)
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)——是否响应,此处为被中断标志响应唤醒
break;——如果是中断标志导致的唤醒,跳出循环。
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)——新的判断,重新竞争锁,将重入次数也加入了,加锁会恢复重入次数。
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled,官方注释,清除掉所有的失效节点。
unlinkCancelledWaiters();
if (interruptMode != 0)——中断标志不为0
reportInterruptAfterWait(interruptMode);——中断唤醒。
可以看到,整个流程可以分为添加节点至等待节点,解除掉所有的锁,唤醒AQS头结点的线程,然后让线程park,进入阻塞。
阻塞结束后,继续把锁加回来,重入计数恢复成未加锁的状态。
public final void await() throws InterruptedException
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node))
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
addConditionWaiter方法。
Node t = lastWaiter;——获取到最后一个节点
if (t != null && t.waitStatus != Node.CONDITION)——这句代码是有官方注释的,If lastWaiter is cancelled, clean out,如果节点是取消状态,就清除掉,可以了解到,接下来的语句一定是用来清除无效节点的。
unlinkCancelledWaiters();——判断内部,清除无效节点
t = lastWaiter;——重新赋予最后一个等待节点
Node node = new Node(Thread.currentThread(), Node.CONDITION);——以当前线程创建等待节点
if (t == null) firstWaiter = node; ——如果尾结点为空,证明链表没有节点,此节点为头结点
else t.nextWaiter = node;——否则,此节点为尾结点的下一节点
lastWaiter = node;——移交尾结点
return node;——返回节点
此方法作用是获取最末尾有效节点,然后将当前线程作为等待节点插入至最后,取代最末节点,依旧是AQS的添加思路。
private Node addConditionWaiter()
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION)
unlinkCancelledWaiters();
t = lastWaiter;
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
unlinkCancelledWaiters方法
Node t = firstWaiter;——获取头结点
Node trail = null;——设置trail 节点
while (t != null)——头结点不为空时,始终循环清除
Node next = t.nextWaiter;——获取头结点的下一个节点,赋予next
if (t.waitStatus != Node.CONDITION)——如果不是condition的节点
t.nextWaiter = null;——赋予下一个节点空
if (trail == null) firstWaiter = next;——如果trail 节点为空,则头结点为下一个节点
else trail.nextWaiter = next;——反之,则将trail 节点的下一节点改为next
if (next == null) lastWaiter = trail;——如果next为空,则最后一个节点就是trail
else trail = t;——如果等于,trail 就是t,trail 就是头结点
t = next;——循环必经之路,t为next
也就是说,整个循环只有t为null才会结束,t会被赋予next的数值,也就是next为null,循环结束,反推之,next为null时,lastWaiter为trail,next还是t.nextWaiter,也就是说,这是一个循环查找下一个符合condition的节点的方法,如果它的下一节点为null,它自己就是最后节点。
private void unlinkCancelledWaiters()
Node t = firstWaiter;
Node trail = null;
while (t != null)
Node next = t.nextWaiter;
if (t.waitStatus != Node.CONDITION)
t.nextWaiter = null;
if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
else
trail = t;
t = next;
fullyRelease方法,此方法会释放所有的锁,此处所有指的是所有的计数,无论重入几次,同时唤醒AQS队列的头结点。
int savedState = getState()——获取重入次数
if (release(savedState))——其内做了简单的判断,是否为当前线程的锁,锁的重入次数是否相当,如果一致,则释放成功,将冲入次数改为0,线程从ExclusiveOwnerThread去除
failed = false;——失败标志置为false
return savedState;——返回重入次数
unparkSuccessor(h),这个方法很眼熟,因为这个方法上文中有提到过,是包含unpark方法的,用来唤醒被阻塞的线程,也就是说,此时AQS队列的头结点将会被唤醒。
final int fullyRelease(Node node)
boolean failed = true;
try
int savedState = getState();
if (release(savedState))
failed = false;
return savedState;
else
throw new IllegalMonitorStateException();
finally
if (failed)
node.waitStatus = Node.CANCELLED;
public final boolean release(int arg)
if (tryRelease(arg))
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
return false;
protected final boolean tryRelease(int releases)
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0)
free = true;
setExclusiveOwnerThread(null);
setState(c);
return free;
3.1.2、signal
把被阻塞的线程唤醒,等待队列中被唤醒的线程转移到AQS队列中。
if (!isHeldExclusively())——判断当前线程是否抢占到锁
Node first = firstWaiter;——获得第一个等待节点
if (first != null)——如果当前节点不为null
doSignal(first);——执行signal
public final void signal()
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
protected final boolean isHeldExclusively()
// While we must in general read state before owner,
// we don't need to do so to check if current thread is owner
return getExclusiveOwnerThread() == Thread.currentThread();
这里是一个do while循环,先看while内部
!transferForSignal(first) &&(first = firstWaiter) != null——transferForSignal方法依靠AQS和CAS来将传入的节点状态由CONDITION替换为0,即从等待节点换为无状态节点,然后用enq方法构建AQS队列,将节点状态换为SIGNAL,然后唤醒线程。
循环内部,如果第一等待节点被赋予first参数节点的下一等待节点,且此节点为null,则将上一等待节点置为null,并且无论结果,first参数节点的下一等待节点都将被置为null。
private void doSignal(Node first)
do
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
while (!transferForSignal(first) &&
(first = firstWaiter) != null);
final boolean transferForSignal(Node node)
/*
* If cannot change waitStatus, the node has been cancelled.
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
3.2、实现方法
Condition的使用方法类似于wait于notify,都需要等待和唤醒。
这是等待部分
public class ConditionDemoWait implements Runnable
private Lock lock;
private Condition condition;
public ConditionDemoWait(Lock lock, Condition condition)
this.lock = lock;
this.condition = condition;
@Override
public void run()
System.out.println("ConditionDemoWait - start");
lock.lock();
try
condition.await();
System.out.println("ConditionDemoWait - end");
catch (Exception e)
e.printStackTrace();
finally
lock.unlock();
这是唤醒部分
public class ConditionDemoSignal implements Runnable
private Lock lock;
private Condition condition;
public ConditionDemoSignal(Lock lock, Condition condition)
this.lock = lock;
this.condition = condition;
@Override
public void run()
System.out.println("ConditionDemoSignal - start");
lock.lock();
try
condition.signal();
System.out.println("ConditionDemoSignal - end");
catch (Exception e)
e.printStackTrace();
finally
lock.unlock();
这是执行部分
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
ConditionDemoWait conditionDemoWait = new ConditionDemoWait(lock, condition);
ConditionDemoSignal conditionDemoSignal = new ConditionDemoSignal(lock, condition);
new Thread(conditionDemoWait).start();
new Thread(conditionDemoSignal).start();
3.3、实际应用
Condition可以实现阻塞队列
- 线程池中的阻塞队列
- 生产者消费者
- 流量缓冲
以上是关于java多线程进阶线程通信的主要内容,如果未能解决你的问题,请参考以下文章