高并发多线程基础之线程间通信与数据共享及其应用
Posted 踩踩踩从踩
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了高并发多线程基础之线程间通信与数据共享及其应用相关的知识,希望对你有一定的参考价值。
前言
本篇文章主要介绍的是java多线程之间如何通信,协同处理任务,以及数据共享,定时任务处理等操作。
多线程之间通信的方式
在实际开发过程中多个线程同时操作,有两种情况的,数据共享和线程间协作
数据共享的方式
-
文件共享数据
通过文件达到多线程间数据共享
public static void main(String[] args) throws Exception {
Path path = Paths.get("file.log");
// 线程1 - 写入数据
new Thread(() -> {
while (true) {
String content = "当前时间" + String.valueOf(System.currentTimeMillis());
try {
Files.write(path, content.getBytes());
Thread.sleep(1000L);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}).start();
// 线程2 - 读取数据
new Thread(() -> {
while (true) {
try {
byte[] allBytes = Files.readAllBytes(path);
System.out.println(new String(allBytes));
Thread.sleep(1000L);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}).start();
}
-
网络共享,常见利用redis等分布式中间键,或者自己搭建的数据中间仓库
-
共享变量 ,全局变量,堆内存中的变量
数据共享会带来很多问题,涉及到协作,这个是分不开的,这个在后面继续描述
线程间协作
jdk
提供的线程协调
API
,例如:
suspend/resume
、
wait/notify
、
park/unpark
多线程协作的典型场景:生产者-消费者 模型。(线程阻塞、线程唤醒)
suspend和resume
作用:调用suspend挂起目标线程,通过resume可以恢复线程执行。
public void test1_normal() throws Exception {
Thread consumerThread = new Thread(new Runnable() {
@Override
public void run() {
if (iceCream == null) {
System.out.println("等待...");
Thread.currentThread().suspend();
}
System.out.println("完成");
}
});
consumerThread.start();
Thread.sleep(3000L);
iceCream = new Object();
System.out.println("通知");
consumerThread.resume(); //通知结束等待
}
suspend和resume 在jdk中被抛弃,主要原因
- 如果在suspend线程等待的时候,添加了synchnized关键字对象给锁住,就没办法进行解锁了。
synchronized (Demo7_SuspendResume.class){
Thread.currentThread().suspend();
}
- resume在suspend之前 ,唤醒在等待之前也会导致死锁的情况
public void test1_normal() throws Exception {
Thread consumerThread = new Thread(new Runnable() {
@Override
public void run() {
if (iceCream == null) {
Thread.sleep(7000L);
System.out.println("等待...");
Thread.currentThread().suspend();
}
System.out.println("完成");
}
});
consumerThread.start();
Thread.sleep(3000L);
iceCream = new Object();
System.out.println("通知");
consumerThread.resume(); //通知结束等待
}
就会造成死锁的情况
wait和notify
wait方法导致当前线程等待,加入该对象的等待集合中,并且放弃当前持有的对象锁。
notify/notifyAll方法唤醒一个或所有正在等待这个对象锁的线程。
wait的时候会自动释放锁,也就是说不会造成死锁的情况
注意1:虽然会wait自动解锁,但是对顺序有要求, 如果在notify被调用之后,才开始wait方法
的调用,线程会永远处于WAITING状态。
注意2:这些方法只能由同一对象锁的持有者线程调用,也就是写在同步块里面,否则会抛出
IllegalMonitorStateException异常。
public void test1_normal() throws Exception {
// 开启一个线程
new Thread(new Runnable() {
@Override
public void run() {
while (iceCream == null) { //
synchronized (this) {
System.out.println("拿到锁。。。");
try {
System.out.println("等待...");
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
System.out.println("完成");
}
}).start();
Thread.sleep(3000L); // 3秒之后
iceCream = new Object(); //
synchronized (this) {
System.out.println("拿到锁。。。");
this.notifyAll();
System.out.println("通知");
}
}
wait和notify是对象的属性,当然也包括 线程对象
synchronized ("synchronized") {
System.out.println("拿到锁。。。");
this.notifyAll();
System.out.println("通知");
}
如果把锁改一下,不是一把锁,执行notifyAll时,则会抛出异常,因为释放不掉锁
拿到锁。。。
等待...
拿到锁。。。
Exception in thread "main" java.lang.IllegalMonitorStateException
at java.lang.Object.notifyAll(Native Method)
at org.cao.thread.wait.Demo8_WaitNotify.test1_normal(Demo8_WaitNotify.java:62)
at org.cao.thread.wait.Demo8_WaitNotify.main(Demo8_WaitNotify.java:8)
会出现死锁的情况,由于如果先唤醒,在锁住,就会出现死锁的情况;
public void test1_normal() throws Exception {
// 开启一个线程
new Thread(new Runnable() {
@Override
public void run() {
while (iceCream == null) { //
synchronized (this) {
System.out.println("拿到锁。。。");
try {
Thread.sleep(3000L); //
System.out.println("等待...");
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
System.out.println("完成");
}
}).start();
Thread.sleep(7000L); //
iceCream = new Object(); //
synchronized (this) {
System.out.println("拿到锁。。。");
this.notifyAll();
System.out.println("通知");
}
}
park和unpark机制
线程调用
park
则等待“许可” ,
unpark
方法为指定线程提供“许可
(permit)”
。
- 调用unpark之后,再调用park,线程会直接运行
- 提前调用的unpark不叠加,连续多次调用unpark后,第一次调用park后会拿到“许可”直接运行,后续调用会进入等待。
- 一次许可,只能解决一次等待,在一次等待,则需要再一次许可
public void test1_normal() throws Exception {
Thread consumerThread = new Thread(new Runnable() {
@Override
public void run() {
while (iceCream == null) {
System.out.println("等待...");
LockSupport.park();
}
System.out.println("完成");
}
});
consumerThread.start();
Thread.sleep(3000L); // 3秒之后
iceCream = new Object(); //
LockSupport.unpark(consumerThread); //
System.out.println("通知");
}
这里一定要将线程对象传给unpark,只能唤醒一个线程。
会存在死锁的情况
-
在同步代码块中使用park/unpark,容易出现死锁
public void test2_DeadLock() throws Exception {
Thread consumerThread = new Thread(new Runnable() {
@Override
public void run() {
if (iceCream == null) { //
System.out.println("等待...");
synchronized (this) { // 若拿到锁
LockSupport.park(); // 执行park
}
}
System.out.println("完成");
}
});
consumerThread.start();
Thread.sleep(3000L); // 3秒之后
iceCream = new Object(); //
synchronized (this) { // 争取到锁以后,才能恢复consumerThread
LockSupport.unpark(consumerThread);
}
System.out.println("通知");
}
主要看一下 park和unpark的源代码里面都是 使用的unsafe cas 自旋锁保证线程数据的安全。
总的来说
suspend/resume:加锁会出现死锁的情况,先唤醒后挂起也会出现死锁的情况,因此被jdk弃用
wait/notify:必须加锁,只用于
synchronized
关键字,如果是锁对象不同,则释放锁时,会抛异常,先唤醒后挂起也会出现死锁
park/unpark:添加synchronized也会出现死锁,先唤醒后挂起不会出现死锁,使用的unsafe cas 自旋锁保证线程数据的安全
伪唤醒
应该在循环中检查等待条件,原因是处于等待状态的线程可能会收到错误警报和伪唤醒,如果不在循环中检查等待条件,程序就会在没有满足结束条件的情况下退出。
伪唤醒是指线程并非因为notify、notifyall、unpark等api调用而意外唤醒,是更底层原因
导致的。
while(iceCream == null) { //
System.out.println("等待...");
synchronized (this) { // 若拿到锁
LockSupport.park(); // 执行park
}
}
也就是cpu自己断了,底层异常情况,只能通过while来保证数据的安全性
线程通信机制的应用
可以利用通信机制实现一个简单的锁
首先实现lock初始化的方法
public class MyLock implements Lock {
@Override
public void lock() {
// TODO Auto-generated method stub
}
@Override
public void lockInterruptibly() throws InterruptedException {
// TODO Auto-generated method stub
}
@Override
public boolean tryLock() {
// TODO Auto-generated method stub
return false;
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
// TODO Auto-generated method stub
return false;
}
@Override
public void unlock() {
// TODO Auto-generated method stub
}
@Override
public Condition newCondition() {
// TODO Auto-generated method stub
return null;
}
}
- 用全局变量存储获取的线程 ,首先这个是线程安全的全局变量;
private AtomicReference<Thread> owner = new AtomicReference<Thread>();
- 并且锁占用是,线程被挂起,挂起的线程的引用被放到waiters队列 这里要达到多个线程拿锁
private BlockingQueue<Thread> waiters = new LinkedBlockingQueue<>();
- 判断当前线程是否可以加锁,实现trylock方法
@Override
public boolean tryLock() {
return owner.compareAndSet(null, Thread.currentThread());
}
- 去抢锁,然后用到线程通信机制,就使用park,实现lock方法
@Override
public void lock() {
while (!tryLock()) {
Thread curTh = Thread.currentThread(); // 获取当前线程引用
waiters.offer(curTh);
LockSupport.park();
}
}
- 然后实现释放锁的方法,实现unlock方法
@Override
public void unlock() {
if (owner.get() == Thread.currentThread()) {
owner.set(null); // 将onwer置为null,释放锁
Thread th = waiters.poll(); // 取出队列头部的元素,并移除该元素
LockSupport.unpark(th); // 唤醒队列头部的元素
}
}
这里是不可重入的锁
多线程应用场景
场景一:批量处理任务
- 向大量(100w以上)的用户发送邮件
-
处理大批量文件
-
例如我在工作中需要按地区 或者 按设备类型 或者按用户等的维度去统计数据,而一个地区的网关设备可能有几百万个,并且维度很多,所以一定要多线程,多任务去处理
场景2:实现异步
- 快速响应用户,入浏览器请求网页、图片时
-
自动作业处理
场景3:增大吞吐量
,例如
tomcat
、数据库 等服务
结语
整篇文章主要围绕着线程间通信,和数据共享的,依据线程间通信实现了一个简单的锁,在实际开发中直接用park和unpark,很少用到;至少我在开发很少用到,都是直接jdk提供的工具,但了解线程间通信是很有必要的
以上是关于高并发多线程基础之线程间通信与数据共享及其应用的主要内容,如果未能解决你的问题,请参考以下文章