java高并发编程--03--线程间通信
Posted shouwangyixin
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了java高并发编程--03--线程间通信相关的知识,希望对你有一定的参考价值。
1.同步阻塞与异步非阻塞
1.1同步阻塞消息处理
服务端监听端口,客户端提交Event,服务端创建线程接收Event,处理Event,返回结果
缺陷: 同步Event提交,客户端等待时间过长(提交Event时间+接收Event时间+处理Event时间+返回结果时间)会陷入阻塞,导致二次提交Event耗时过长
由于客户端提交Event数量有限,导致系统受理业务数量有限,系统吞吐量不高
一个线程处理一个Event,线程频繁创建销毁,从而增加系统额外开销
业务达到峰值时,大量业务处理线程会导致CPU频繁进行上下文切换,降低系统性能
1.2异步非阻塞消息处理
客户端提交Event后会得到一个相应的工单ID并立即返回,Event则会被放置在Event队列中,服务端有若干个工作线程,不断地从Event队列中回去任务并进行异步处理,最后将处理结果保存到另外一个结果集中,如果客户端需要处理结果,可以凭借工单ID再次查询
优势:
客户端不用等待处理结果就能返回,提供类系统吞吐量和并发量
服务线程数量在一个可空范围内,不会导致太多的CPU上下文切换带来的额外开销,服务线程可以复用,减少创建线程和销毁线程的资源浪费
缺陷:
客户端向要结果需要再次调用接口进行查询
2.单线程间通信
wait和notify方法不是Thread特有的方法,而是Object的方法。
wait方法必须拥有该对象的monitor,也就是wait方法必须在该对象为monitor的同步代码中使用。
当线程执行对象的wait方法后,会放弃对该对象的monitor的所有权并进入该对象的wait set中,其他线程将有机会继续争抢该对象的monitor所有权
notify 唤醒单个执行该对象wait方法的线程
notifyAll 唤醒所有执行该对象wait方法的线程
被唤醒的线程需要重新获取该对象所关联monitor的lock才能继续执行
public class EventQueue private final int MAX; static class Event //Event队列 private final LinkedList<Event> EVENTS = new LinkedList<EventQueue.Event>(); private static final int DEFAULT_MAX = 10; EventQueue() this(DEFAULT_MAX); EventQueue(int max) this.MAX = max; //提交Event到队尾 public void offer(Event event) synchronized (EVENTS) if(EVENTS.size() >= MAX) try System.out.println("EVENTS is Full."); EVENTS.wait(); catch (InterruptedException e) e.printStackTrace(); System.out.println("The Event has submitted."); EVENTS.addLast(event); EVENTS.notify(); //从队首取走Event public Event take() synchronized (EVENTS) if(EVENTS.isEmpty()) try System.out.println("EVENTS is Empty."); EVENTS.wait(); catch (InterruptedException e) e.printStackTrace(); Event first = EVENTS.removeFirst(); EVENTS.notify(); System.out.println("Thre Event " + first +" has Hadled."); return first; public static void main(String[] args) EventQueue eq = new EventQueue(); //创建一个线程,假定提交任务没有时间间隔,用循环不断提交任务 new Thread(() -> while (true) eq.offer(new Event()); ,"Producer").start(); //创建一个线程,假定处理一个任务需要一定时间,使用循环处理任务 new Thread(() -> while(true) eq.take(); try TimeUnit.SECONDS.sleep(3); catch (InterruptedException e) // TODO Auto-generated catch block e.printStackTrace(); ,"Consumer").start();
输出结果:
EVENTS is Empty.
The Event has submitted.
The Event has submitted.
The Event has submitted.
The Event has submitted.
The Event has submitted.
The Event has submitted.
The Event has submitted.
The Event has submitted.
The Event has submitted.
The Event has submitted.
EVENTS is Full.
Thre Event cp5.cp2.EventQueue$Event@6a18032 has Hadled.
The Event has submitted.
EVENTS is Full.
Thre Event cp5.cp2.EventQueue$Event@c59eb42 has Hadled.
The Event has submitted.
EVENTS is Full.
Thre Event cp5.cp2.EventQueue$Event@495b74ad has Hadled.
The Event has submitted.
EVENTS is Full.
Thre Event cp5.cp2.EventQueue$Event@43f1dda4 has Hadled.
The Event has submitted.
EVENTS is Full.
Thre Event cp5.cp2.EventQueue$Event@78d89982 has Hadled.
The Event has submitted.
EVENTS is Full.
wait和sleep的主要区别:
wait是Object方法,sleep是Thread方法
wait需要在同步代码中使用,sleep不用
线程在同步代码中执行sleep方法时不会释放monitor的锁,而wait方法则会释放monitor的锁
3.多线程间通信
3.1生产者和消费者
前面的例子若修改成多线程并发则会出现数据不一致的情况:
修改代码如下:
public class EventQueue private final int MAX; static class Event //Event队列 private final LinkedList<Event> EVENTS = new LinkedList<EventQueue.Event>(); private static final int DEFAULT_MAX = 10; EventQueue() this(DEFAULT_MAX); EventQueue(int max) this.MAX = max; //提交Event到队尾 public void offer(Event event) synchronized (EVENTS) if(EVENTS.size() >= MAX) try System.out.println("EVENTS is Full."); EVENTS.wait(); catch (InterruptedException e) e.printStackTrace(); EVENTS.addLast(event); System.out.println("The Event has submitted."+EVENTS.size()); EVENTS.notify(); //从队首取走Event public Event take() synchronized (EVENTS) if(EVENTS.isEmpty()) try System.out.println("EVENTS is Empty."); EVENTS.wait(); catch (InterruptedException e) e.printStackTrace(); Event first = EVENTS.removeFirst(); EVENTS.notify(); System.out.println("Thre Event " + first +" has Hadled."); return first; public static void main(String[] args) EventQueue eq = new EventQueue(); //创建3个提交线程,假定提交任务没有时间间隔,用循环不断提交任务 new Thread(() -> while (true) eq.offer(new Event()); ,"Producer1").start(); new Thread(() -> while (true) eq.offer(new Event()); ,"Producer2").start(); new Thread(() -> while (true) eq.offer(new Event()); ,"Producer3").start(); //创建2个处理线程,假定处理一个任务需要一定时间,使用循环处理任务 new Thread(() -> while(true) eq.take(); try TimeUnit.SECONDS.sleep(3); catch (InterruptedException e) // TODO Auto-generated catch block e.printStackTrace(); ,"Consumer1").start(); new Thread(() -> while(true) eq.take(); try TimeUnit.SECONDS.sleep(3); catch (InterruptedException e) // TODO Auto-generated catch block e.printStackTrace(); ,"Consumer2").start();
输出结果:
The Event has submitted.10511
EVENTS is Full.
The Event has submitted.10512
EVENTS is Full.
The Event has submitted.10513
EVENTS is Full.
The Event has submitted.10514
EVENTS is Full.
。。。
上面出现类Event数量大于最大值的情况,还可能出现队列为空,但Consumer仍然去获取Event的情况。
疑问:代码中已经添加了synchronized数据同步,为何还会出现数据不同步的情况?
原因如下:
1)线程Producer1、Producer2、Producer3均执行到添加Event判断一步,发现数量达到最大,就执行了wait方法放弃锁,被阻塞,Consumer1消耗一个Event后执行notify方法,唤醒一个Producer,这个Producer执行完添加后,唤醒其他线程,如果唤醒的是一个Producer,就会出现添加的数量超过限制的情况
2)线程Consumer1和线程Consumer2执行到Event判断一步,发现队列为空,则会执行wait方法放弃锁,被阻塞,一个Producer添加完后执行notify方法唤醒其他线程,若唤醒一个Consumer,这个Consumer消耗一个Event后继续唤醒其他线程,若唤醒的是一个Consumer,新唤醒的Consumer则会在队列为空的情况下获取Event
可以如下改进代码,使用while循环在线程获取到monitor锁后再一次进行判断,防止进入后面的代码:
public class EventQueue private final int MAX; static class Event //Event队列 private final LinkedList<Event> EVENTS = new LinkedList<EventQueue.Event>(); private static final int DEFAULT_MAX = 10; EventQueue() this(DEFAULT_MAX); EventQueue(int max) this.MAX = max; //提交Event到队尾 public void offer(Event event) synchronized (EVENTS) while(EVENTS.size() >= MAX) try System.out.println("EVENTS is Full."); EVENTS.wait(); catch (InterruptedException e) e.printStackTrace(); EVENTS.addLast(event); System.out.println("The Event has submitted."+EVENTS.size()); EVENTS.notify(); //从队首取走Event public Event take() synchronized (EVENTS) while (EVENTS.isEmpty()) try System.out.println("EVENTS is Empty."); EVENTS.wait(); catch (InterruptedException e) e.printStackTrace(); Event first = EVENTS.removeFirst(); EVENTS.notify(); System.out.println("Thre Event " + first +" has Hadled."); return first; public static void main(String[] args) EventQueue eq = new EventQueue(); //创建3个提交线程,假定提交任务没有时间间隔,用循环不断提交任务 new Thread(() -> while (true) eq.offer(new Event()); ,"Producer1").start(); new Thread(() -> while (true) eq.offer(new Event()); ,"Producer2").start(); new Thread(() -> while (true) eq.offer(new Event()); ,"Producer3").start(); //创建2个处理线程,假定处理一个任务需要一定时间,使用循环处理任务 new Thread(() -> while(true) eq.take(); try TimeUnit.SECONDS.sleep(3); catch (InterruptedException e) // TODO Auto-generated catch block e.printStackTrace(); ,"Consumer1").start(); new Thread(() -> while(true) eq.take(); try TimeUnit.SECONDS.sleep(3); catch (InterruptedException e) // TODO Auto-generated catch block e.printStackTrace(); ,"Consumer2").start();
输出结果:
The Event has submitted.1
The Event has submitted.2
The Event has submitted.3
The Event has submitted.4
The Event has submitted.5
The Event has submitted.6
The Event has submitted.7
The Event has submitted.8
The Event has submitted.9
The Event has submitted.10
EVENTS is Full.
EVENTS is Full.
Thre Event cp5.cp2.EventQueue$Event@39b9b54c has Hadled.
The Event has submitted.10
EVENTS is Full.
Thre Event cp5.cp2.EventQueue$Event@50626525 has Hadled.
The Event has submitted.10
EVENTS is Full.
EVENTS is Full.
EVENTS is Full.
Thre Event cp5.cp2.EventQueue$Event@3341de3c has Hadled.
The Event has submitted.10
EVENTS is Full.
EVENTS is Full.
Thre Event cp5.cp2.EventQueue$Event@5ea58159 has Hadled.
The Event has submitted.10
EVENTS is Full.
EVENTS is Full.
Thre Event cp5.cp2.EventQueue$Event@76435630 has Hadled.
The Event has submitted.10
EVENTS is Full.
EVENTS is Full.
Thre Event cp5.cp2.EventQueue$Event@5a9a0e55 has Hadled.
The Event has submitted.10
EVENTS is Full.
EVENTS is Full.
Thre Event cp5.cp2.EventQueue$Event@2249e61 has Hadled.
The Event has submitted.10
EVENTS is Full.
EVENTS is Full.
Thre Event cp5.cp2.EventQueue$Event@1f988b62 has Hadled.
The Event has submitted.10
EVENTS is Full.
EVENTS is Full.
。。。
3.3线程休息室wait set
在虚拟机规范中存在一个wait set(又被成为线程休息室)的概念,线程调用了某个对象的wait方法后,都会被加入到该对象monitor关联的wait set中,并设法monitor的所有权。
若干线程调用了同一个对象的wait方法后均被加入该对象monitor关联的wait set中,待另外一个线程调用该monitor的notify方法之后,其中一个线程会从wait set弹出,至于是哪一个,则是随机的,虚拟机规范没有要求。
若执行的是notifyAll方法,则所有线程都会被弹出
4自定义显式锁BooleanLock
4.1synchronized关键字的缺陷
1)无法控制阻塞时长
同一monitor的锁被线程A获取后,线程B等待锁的获取,等待的时间由线程A什么时候释放决定。
2)阻塞不可被中断
线程B抢某个monitor的锁而进入阻塞状态,那么这种阻塞状态是无法中断的,虽然可以在线程B上设置interrupt标识,但是synchronized阻塞不像sleep和wait方法一样,并不会捕获中断信号,也就不会中断
4.2显式锁BooleanLock
本节将构造一个显示的BooleanLock,即具备synchronized关键字所有的功能又具备可中断和lock超时功能
1)定义Lock接口
public interface Lock //类似synchronized关键字,lock方法永远阻塞,除非获取到锁,可以被打断,打断抛出InterruptedException void lock() throws InterruptedException; //类似synchronized关键字,lock方法永远阻塞,除非获取到锁,可以被打断或超时,打断抛出InterruptedException,超时抛出TimeoutException void lock(long timeMillis) throws InterruptedException,TimeoutException; //释放锁 void unLock(); //获取哪些线程被阻塞 List<Thread> getBlockedThreads();
2)实现Lock
//通过控制Boolean变量开关决定是否允许当前线程获取到锁 public class BlooeanLock implements Lock //当前拥有锁的线程 private Thread currentThread; //locked为false标识当前该锁没有被任何线程获得或已被释放,true表示该锁以被获得,该线程是currentThread private boolean locked = false; //存储因获取该锁而进入阻塞的线程 private List<Thread> blockedThreadList = new ArrayList<Thread>(); @Override public void lock() throws InterruptedException //使用同步代码块的方式进行同步 synchronized (this) //如果某个线程已经获取到当前锁,将该线程放到blockedThreadList中,并使用wait方法释放该线程对this monitor的所有权 while(locked) blockedThreadList.add(Thread.currentThread()); this.wait(); //如果没有线程在拥有当前锁,从blockedThreadList移除当前线程,当前线程设置为拥有锁的线程并将锁设置为已被拥有的状态 this.blockedThreadList.remove(Thread.currentThread()); this.locked = true; this.currentThread = Thread.currentThread(); @Override public void lock(long timeMillis) throws InterruptedException, TimeoutException //同样使用同步代码块的方式进行同步 synchronized (this) //如果时间不合法,默认使用lock方法,通常处理是抛出异常,抛出异常更好一些 if(timeMillis <= 0) this.lock(); //throw *Exception else long remainMillis = timeMillis; long endMillis = System.currentTimeMillis() + remainMillis; while(locked) //如果remainMillis小于等于0,意为着当前线程被其他线程唤醒或者在指定wait时间内没有获取到锁,这种情况抛出异常 if(remainMillis <= 0) throw new TimeoutException("can not get lock during " + timeMillis); else if(!blockedThreadList.contains(Thread.currentThread())) blockedThreadList.add(Thread.currentThread()); //等待remainMillis时间,remainMillis最初由其他线程传入,但在多次wait过程重新计算 this.wait(remainMillis); //重新计算remainMillis remainMillis = endMillis - System.currentTimeMillis(); //如果没有线程在拥有当前锁,从blockedThreadList移除当前线程,当前线程设置为拥有锁的线程并将锁设置为已被拥有的状态 this.blockedThreadList.remove(Thread.currentThread()); this.locked = true; this.currentThread = Thread.currentThread(); @Override public void unLock() //哪一个线程加的锁只能由哪一个线程来解锁 synchronized (this) //判断是否是当前线程获得的锁 if(currentThread.equals(Thread.currentThread())) //将lock置为false locked = false; //通知其他线程 this.notifyAll(); @Override public List<Thread> getBlockedThreads() // TODO Auto-generated method stub return null;
3)使用BooleanLock
public class BooleanLockTest //定义BooleanLock private static final BlooeanLock lock = new BlooeanLock(); //使用try..finally语句来确保每次锁都被释放 public void synchronizedMethod() try lock.lock(); int random = new Random().nextInt(10); System.out.println("Thread " + Thread.currentThread() + " 获取到锁。"); TimeUnit.SECONDS.sleep(random); catch (InterruptedException e) e.printStackTrace(); finally //释放锁 lock.unLock(); public static void main(String[] args) BooleanLockTest blt = new BooleanLockTest(); IntStream.range(1, 10).mapToObj(i -> new Thread(blt::synchronizedMethod)).forEach(Thread::start);
输出结果:
Thread Thread[Thread-0,5,main] 获取到锁。
Thread Thread[Thread-2,5,main] 获取到锁。
Thread Thread[Thread-5,5,main] 获取到锁。
Thread Thread[Thread-7,5,main] 获取到锁。
Thread Thread[Thread-6,5,main] 获取到锁。
Thread Thread[Thread-3,5,main] 获取到锁。
Thread Thread[Thread-8,5,main] 获取到锁。
Thread Thread[Thread-1,5,main] 获取到锁。
Thread Thread[Thread-4,5,main] 获取到锁。
使用可中断被阻塞的线程
将前面的main方法修改如下
public static void main(String[] args) throws InterruptedException BooleanLockTest blt = new BooleanLockTest(); new Thread(blt::synchronizedMethod,"T1").start(); TimeUnit.MICROSECONDS.sleep(2); Thread t2 = new Thread(blt::synchronizedMethod,"T2"); t2.start(); TimeUnit.MICROSECONDS.sleep(2); t2.interrupt();
输出结果:
Thread Thread[T1,5,main] 获取到锁。
java.lang.InterruptedException
at java.base/java.lang.Object.wait(Native Method)
at java.base/java.lang.Object.wait(Object.java:328)
at cp5.cp4.BlooeanLock.lock(BlooeanLock.java:22)
at cp5.cp4.BooleanLockTest.synchronizedMethod(BooleanLockTest.java:13)
at java.base/java.lang.Thread.run(Thread.java:834)
上面的代码仍然存在问题:如果某个线程的wait被打断,抛出异常后还可能存在于blockedThreadList,可以增强lock方法,使用try-catch语句包围wait,在catch语句中移除这个线程。
使用可超时线程
public class BooleanLockTest //定义BooleanLock private static final BlooeanLock lock = new BlooeanLock(); //使用try..finally语句来确保每次锁都被释放 public void synchronizedMethod() try lock.lock(100);//使用带时间的lock,尝试的时间可能小于下面一个任务执行的时间 int random = new Random().nextInt(10); System.out.println("Thread " + Thread.currentThread() + " 获取到锁。"); TimeUnit.SECONDS.sleep(random); catch (InterruptedException e) e.printStackTrace(); catch (TimeoutException e) e.printStackTrace(); finally //释放锁 lock.unLock(); public static void main(String[] args) throws InterruptedException BooleanLockTest blt = new BooleanLockTest(); new Thread(blt::synchronizedMethod,"T1").start(); TimeUnit.MICROSECONDS.sleep(2); Thread t2 = new Thread(blt::synchronizedMethod,"T2"); t2.start();
多执行几次,应有输出结果:
Thread Thread[T1,5,main] 获取到锁。
java.util.concurrent.TimeoutException: can not get lock during 100
at cp5.cp4.BlooeanLock.lock(BlooeanLock.java:45)
at cp5.cp4.BooleanLockTest.synchronizedMethod(BooleanLockTest.java:14)
at java.base/java.lang.Thread.run(Thread.java:834)
以上是关于java高并发编程--03--线程间通信的主要内容,如果未能解决你的问题,请参考以下文章