Java并发——多线程并发编程中的同步器
Posted 守夜人爱吃兔子
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java并发——多线程并发编程中的同步器相关的知识,希望对你有一定的参考价值。
1. CyclicBarrier
- 和 CountDownLatch 类似
- 线程会等待,直到足够多线程达到了事先规定的数据。一旦触发条件,就可以进行下一步的操作
- 适用于线程之间相互等待处理结果就绪的场景
- CyclicBarrier 可以构造一个集结点,当某一个线程执行完毕,它就会到集结点等待,直到所有线程都到了集结点,那么该栅栏就会被撤销,所有线程再统一出发,继续执行剩下的任务
用法一:等待所有人到达指定地点,再统一出发
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierDemo {
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(5, new Runnable() {
@Override
public void run() {
System.out.println("所有人都到了,大家统一出发");
}
});
for (int i = 0; i < 5; i++) {
new Thread(new Task(cyclicBarrier)).start();
}
}
static class Task implements Runnable {
private final CyclicBarrier cyclicBarrier;
public Task(CyclicBarrier cyclicBarrier) {
this.cyclicBarrier = cyclicBarrier;
}
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName() + " 现在前往集合地点");
Thread.sleep((long) (Math.random() * 10000));
System.out.println(Thread.currentThread().getName() + " 已经到了集合地点,等待其他人到达");
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
}
}
用法二:CyclicBarrier简单使用示例
2. CountDownLatch
- 和 CyclicBarrier 类似,数量递减到0时,触发动作
- 但是不可重复使用
主要方法:
- CountDownLatch(int count),仅有一个构造函数,参数 count 为需要倒数的数值
- await(),调用 await() 方法的现场会被挂起,它会等待直到 count 值为0才继续执行
- countDown(),讲 count 值减1,直到为0时,等待的现场会被唤起
用法一:一个线程等待多个线程都执行完毕,再继续自己的工作(一等多)
import java.util.concurrent.*;
public class Test {
public static void main(String[] args) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(5);
ExecutorService service = Executors.newFixedThreadPool(5);
for (int i = 0; i < 5; i++) {
final int no = i + 1;
Runnable runnable = new Runnable() {
@Override
public void run() {
try {
Thread.sleep((long) (Math.random() * 10000));
System.out.println("Num:" + no);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
latch.countDown();
}
}
};
service.submit(runnable);
}
System.out.println("等待5个线程执行完....");
latch.await();
System.out.println("所有线程都已经执行完,可以进入下一个环节了。");
}
}
用法二:多个线程等待某一个线程的信号,然后同时开始执行(多等一)
import java.util.concurrent.*;
/**
* 模拟跑步比赛,5名选手等待1名裁判发令,发令后所有人同时开始跑步
*/
public class Test {
public static void main(String[] args) throws InterruptedException {
// 1名裁判
CountDownLatch latch = new CountDownLatch(1);
// 5名选手
ExecutorService service = Executors.newFixedThreadPool(5);
for (int i = 0; i < 5; i++) {
final int no = i + 1;
Runnable runnable = new Runnable() {
@Override
public void run() {
try {
System.out.println("NO." + no + "号选手准备完毕,等待发令");
latch.await();
System.out.println("NO." + no + "开始跑步");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
service.submit(runnable);
}
System.out.println("裁判检查发令枪....");
Thread.sleep(2000);
System.out.println("裁判检查完毕,比赛开始....");
latch.countDown();
}
}
CyclicBarrier 和 CountDownLatch 的区别
- 作用不同:CyclicBarrier要等固定数量的线程都到达了栅栏位置才能继续执行,而CountDownLatch只需等待数字到0,也就是说,CountDownLatch用于事件,但是CyclicBarrier是用于线程的
- 可重用性不同:CountDownLatch在倒数到0并触发门闩打开后,就不能再次使用了,除非新建新的实例;而CyclicBarrier可以重复使用
3. Semaphore
- 信号量,可以通过控制“许可证”的数量,来保证线程之间的配合
- 线程只有在拿到“许可证”后才能继续运行,相比于其他的同步器,更灵活
主要方法:
- new Semaphore(int permits, boolean fair),这里可以设置是否要设置公平策略,如果传入true,那么Semaphore会把之前等待的线程放到FIFO的队列里,以便于当有了新的许可证,可以分发给之前等了最长时间的线程;
- acquire(),获取许可证,没有的话会陷入阻塞状态;
- acquireUninterruptibly(),同上,但是可以响应中断;
- tryAcquire(),看看现在有没有空闲的许可证,如果有的话就获取,没有的话,不用陷入阻塞状态,可以去做其它的事,过一会再来查看许可证的空闲情况
- tryAcquire(long timeout, TimeUnit unit),和 tryAcquire() 一样,但是多了一个超时时间,比如“在3秒内获取不到许可证,就去做别的事情”
- release(),操作完成后,归还许可证
- 可以指定获取和释放的许可证数量,获取和释放的数量必须一致
用法:
import java.util.concurrent.*;
public class Test {
static Semaphore semaphore = new Semaphore(3, true);
public static void main(String[] args) throws InterruptedException {
ExecutorService service = Executors.newFixedThreadPool(50);
for (int i = 0; i < 100; i++) {
service.submit(new Task());
}
service.shutdown();
}
static class Task implements Runnable {
@Override
public void run() {
try {
semaphore.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " 拿到了许可证");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " 释放了许可证");
semaphore.release();
}
}
}
4. Condition
- 控制线程的“等待”和“唤醒”,Object.wait()的升级版
实现生产者消费者
import java.util.PriorityQueue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ConditionDemo {
private final int queueSize = 10;
private final PriorityQueue<Integer> queue = new PriorityQueue<>(queueSize);
private final Lock lock = new ReentrantLock();
private final Condition notFull = lock.newCondition();
private final Condition notEmpty = lock.newCondition();
public static void main(String[] args) {
ConditionDemo demo = new ConditionDemo();
Produce produce = demo.new Produce();
Consume consume = demo.new Consume();
produce.start();
consume.start();
}
/**
* 消费者
*/
class Consume extends Thread {
@Override
public void run() {
try {
consume();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private void consume() throws InterruptedException {
while (true) {
lock.lock();
try {
while (queue.size() == 0) {
System.out.println("队列空,等待数据");
notEmpty.await();
}
// 从队列中取数据
queue.poll();
// 并且唤醒生产者
notFull.signal();
System.out.println("从队列中取走了一条数据,队列剩余:" + queue.size());
} finally {
lock.unlock();
}
}
}
}
/**
* 生产者
*/
class Produce extends Thread {
@Override
public void run() {
try {
produce();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private void produce() throws InterruptedException {
while (true) {
lock.lock();
try {
while (queue.size() == queueSize) {
System.out.println("队列满,等待消费者进行消费");
notFull.await();
}
// 开始生产数据
queue.offer(1);
// 并且唤醒消费者
notEmpty.signal();
System.out.println("向队列添加了一条数据,队列剩余:" + queue.size());
} finally {
lock.unlock();
}
}
}
}
}
5. Phaser
- 和 CyclicBarrier 类似
示例:4个线程执行完后做一次同步操作
import java.nio.file.Path;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Phaser;
public class PhaserTest {
Phaser phaser = new Phaser();
ExecutorService executorService = Executors.newCachedThreadPool();
class Worker implements Runnable {
@Override
public void run() {
phaser.register();
while (true) {
try {
Thread.sleep(500);
System.out.println("working:" + phaser.getPhase());
phaser.arriveAndAwaitAdvance();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public void run() throws InterruptedException {
phaser.register();
executorService.execute(new Worker());
executorService.execute(new Worker());
executorService.execute(new Worker());
executorService.execute(new Worker());
while (true) {
phaser.arriveAndAwaitAdvance();
System.out.println("Sync..." + phaser.getPhase());
}
}
public static void main(String[] args) throws InterruptedException {
var test = new PhaserTest();
test.run();
}
}
6. Exchanger
- 让两个线程在合适时交换数据
- 适用场景:当两个线程工作在同一个类的不同实例上时,用于交换数据
- 解决了什么问题:线程间高效交换数据
最后
最近我整理了整套《JAVA核心知识点总结》,说实话 ,作为一名Java程序员,不论你需不需要面试都应该好好看下这份资料。拿到手总是不亏的~我的不少粉丝也因此拿到腾讯字节快手等公司的Offer
进【Java进阶之路群】,找管理员获取哦-!
以上是关于Java并发——多线程并发编程中的同步器的主要内容,如果未能解决你的问题,请参考以下文章