多线程总结-JUC中常用的工具类
Posted bug樱樱
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了多线程总结-JUC中常用的工具类相关的知识,希望对你有一定的参考价值。
本文只记录JUC中较常用到的一些工具类,只是列举其常见的使用方法,至于其实现原理,此处不做说明。
CountDownLatch
一个同步工具类,允许一个或多个线程一直等待,直到其他线程运行完成后再执行。所以在必要时,可以利用它来对各线程的执行结果进行汇总。CountDownLatch
主要有两个方法,countDown
和await
。countDown
用来计计数器减1, await
则是让当前调用它的线程处于等待状态。可以看如下示例代码:
CountDownLatch latch = new CountDownLatch(2);
new Thread(() -> {
try {
Thread.sleep((long) (Math.random() * 1000));
System.out.println("子线程:" + Thread.currentThread().getName() + "执行");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
latch.countDown();
}
}).start();
new Thread(() -> {
try {
Thread.sleep((long) (Math.random() * 1000));
System.out.println("子线程:" + Thread.currentThread().getName() + "执行");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
latch.countDown();
}
}).start();
System.out.println("main thread await. ");
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("-----------thread execute finished");
复制代码
在创建CountDownLatch的时候传入了一个整数,在这个整数倒数到0之前,主线程会处于阻塞状态。如上示例,传了一个整数2
, 新起了两个线程,在每个线程执行完毕后都会调用latch.countDown()
方法。子线程启动后,在主线程中调用latch.await()
方法,所以最后的执行结果是会先打印两个子线程中的输出,再打印主线程中的输出。
CyclicBarrier
可循环使用的屏障,可用它来描述一组线程之间的相互阻塞,直到最后一个线程到达屏障时,所有被阻塞的线程才会继续运行。 await
方法是告诉CyclicBarrier
已到达屏障, 然后当前线程被阻塞。 如下代码:
public class CyclicBarrierDemo {
public static void main(String args[]) {
CyclicBarrier barrier = new CyclicBarrier(2);
new Thread() {
@Override
public void run() {
super.run();
try {
barrier.await();
} catch (Exception e) {}
System.out.println(1);
}
}.start();
try {
barrier.await();
} catch (Exception e) {
}
System.out.println(2);
}
}
复制代码
其可以先输出1, 也可能先输出2。
如果在初始化CyclicBarrier时传入一个Runnable, 如下:
CyclicBarrier barrier = new CyclicBarrier(2, new A());
复制代码
static class A implements Runnable {
@Override
public void run() {
System.out.println(3);
}
}
复制代码
则上面的执行结果会是
3
1
2
复制代码
对于构造函数public CyclicBarrier(int parties, Runnable barrierAction)
, 当线程到达屏障时,会优先执行barrierAction
, 所以上面的代码中new A()
会先执行。
FutureTask
FutureTask
实现了Runnable
和Future
接口,所以FutureTask
对象可以直接被Thread执行,也可以作为任务提交给线程池执行。调用其get
方法,可以拿到执行的结果。
交给线程池来执行
FutureTask<Integer> task = new FutureTask<>(()-> {
System.out.println("execute in thread:" + Thread.currentThread().getName());
return 1 + 1;
});
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.submit(task);
int result = task.get();
复制代码
交给Thread
直接执行
FutureTask<Integer> task = new FutureTask<>(()-> {
System.out.println("execute in thread:" + Thread.currentThread().getName());
return 1 + 1;
});
Thread t1 = new Thread(task);
t1.start();
task.get();
复制代码
Fork/Join
Fork/Join框架是java 7中引入进来了,它采用分治的方法来实现并行计算。假如一个复杂问题,如果能够将其分解成两个或多个同类型的子问题进行计算,计算完每个子问题后再对其结果进行合并,那么该框架就非常适合这种场景。对于一个斐波纳契数 , 我们一般的解法是这样子:
private static int fibonacci(int n) {
if (n == 0) return 0;
if (n == 1) return 1;
return fibonacci(n - 1) + fibonacci(n - 2);
}
复制代码
如果我们用Fork/Join框架
public class FibonacciDemo {
public static void main(String args[]) {
ForkJoinPool forkJoinPool = new ForkJoinPool(8);
Fibonacci task = new Fibonacci(10);
Integer result = forkJoinPool.invoke(task);
System.out.println(result);
}
static class Fibonacci extends RecursiveTask<Integer> {
final int n;
public Fibonacci(int n) {
this.n = n;
}
@Override
protected Integer compute() {
if (n <= 1) return n;
Fibonacci task1 = new Fibonacci(n -1 );
task1.fork();
Fibonacci task2 = new Fibonacci( n - 2);
return task2.compute() + task1.join();
}
}
}
复制代码
Fork/Join使用两个类ForkJoinTask
和ForkJoinPool
来完成计算任务。
ForkJoinTask
RecursiveAction
用于没有返回结果的任务RecursiveTask
用于有返回结果的任务
fork
方法
提交一个任务异步执行
join
方法
等待一个任务执行完毕
ForkJoinPool
指定线程池, ForkJoinTask
需要通过ForkJoinPool
来执行
CompletableFuture
CompletableFuture
是java8中新引入进来的, 可以实现汇总的工作。使用它无须手工创建线程,同时语义更加清晰。我们常用的方法:
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor)
runAsync
和supplyAsync
相比,runAsync
是没有返回值的,而supplyAsync
是有返回值。这两个方法都可以指定线程池,如果不指定就会使用默认的线程池,该线程池在CompletableFuture
中有定义:
private static final Executor ASYNC_POOL = USE_COMMON_POOL ?
ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
复制代码
这是一个静态变量,也就是所有的CompletableFuture
都会共享这一个线程池。所以如果一旦有耗时的任务运行,可能会造成线程处于等待状态,从而影响性能。所以建议根据不同的业务类型创建不同的线程池,避免相互干扰。
public class CompletableFutureLearning {
public static void main(String args[]) throws ExecutionException, InterruptedException {
CompletableFuture<Void> f1 = CompletableFuture.runAsync(()-> {
System.out.println("t1 execute in thread " + Thread.currentThread().getName());
});
CompletableFuture<Void> f2 = CompletableFuture.runAsync(()-> {
System.out.println("t2 execute in thread " + Thread.currentThread().getName());
});
CompletableFuture<String> f3 = f1.thenCombine(f2,(__, tf)-> {
System.out.println("t3 execute in thread " + Thread.currentThread().getName());
return "all executed";
});
System.out.println(f3.get();
}
}
复制代码
输出:
t1 execute in thread ForkJoinPool.commonPool-worker-1
t2 execute in thread ForkJoinPool.commonPool-worker-1
t3 execute in thread main
all executed
复制代码
再来看如下demo, 我们用supplyAsync
CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(()-> {
System.out.println("t1 execute in thread " + Thread.currentThread().getName());
return 1;
});
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(()-> {
System.out.println("t2 execute in thread " + Thread.currentThread().getName());
return "abc";
});
CompletableFuture<String> f3 = f1.thenCombine(f2,(f1data, f2data)-> {
System.out.println("t3 execute in thread " + Thread.currentThread().getName());
return "f1data:" + f1data + " f2data:" + f2data;
});
System.out.println(f3.get());
复制代码
输出
t1 execute in thread ForkJoinPool.commonPool-worker-1
t2 execute in thread ForkJoinPool.commonPool-worker-1
t3 execute in thread main
f1data:1 f2data:abc
复制代码
CompletableFuture
实现了CompletionStage
接口,该接口可用来描述任务之间的串行,AND聚合, OR聚合关系及异常的处理。如上面demo中的f1.thenCombine
语句,thenCombine
便是聚合关系中的一种。
重入锁 ReentrantLock
支持一个线程对资源重复加锁, 如下:
class Counter {
private final Lock lock = new ReentrantLock();
private int count;
public int get() {
lock.lock();
try {
return count;
} finally {
lock.unlock();
}
}
public void add(int n) {
lock.lock();
try {
count += n;
} finally {
lock.unlock();
}
}
}
复制代码
公平锁与非公平锁
在使用ReentrantLock时,有一个构造函数可以传参数:
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
复制代码
传true
代表的是公平锁, 反之则表示构造一个非公平锁
读写锁 ReadWriteLock
ReetrantLock
是排他锁,即同一时刻只能允许一个线程访问。对于读写锁:
- 同一时刻可以允许多个读线程访问。
- 同一时刻只能允许一个写线程访问。
- 如果一个写线程正在执行写操作,此时禁止读线程共享变量。
我们可以用读写锁来实现一个通用的缓存工具类
public class Cache<K, V> {
final Map<K, V> m = new HashMap<>();
final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
final Lock r = readWriteLock.readLock();
final Lock w = readWriteLock.writeLock();
V get(K key) {
r.lock();
try {
return m.get(key);
} finally {
r.unlock();
}
}
V put(K key, V value) {
w.lock();
try {
return m.put(key, value);
} finally {
w.unlock();
}
}
}
复制代码
以上是关于多线程总结-JUC中常用的工具类的主要内容,如果未能解决你的问题,请参考以下文章
Java基础学习总结(193)—— JUC 常用并发工具类总结
JUC并发编程 共享模式之工具 JUC CountdownLatch(倒计时锁) -- CountdownLatch应用(等待多个线程准备完毕( 可以覆盖上次的打印内)等待多个远程调用结束)(代码片段