Java核心技术读书笔记10-2 阻塞队列线程安全集合类Callable与Future线程池与任务组同步框架
Posted 芝芝与梅梅
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java核心技术读书笔记10-2 阻塞队列线程安全集合类Callable与Future线程池与任务组同步框架相关的知识,希望对你有一定的参考价值。
3. 阻塞队列
阻塞队列是一种队列,当线程想满队中添加元素,或对空队取元素时会阻塞该线程。这种结构可以解决一些同步问题,如生产者与消费者。
对阻塞队列BlockingQueue一些操作方法和结果
当使用阻塞队列管理线程时,常用的就是put和take方法。
BlockingQueue的一些实现类:
LinkedBlockingQueue使用链表实现,因此容量没有上边界,但可以选择指定最大容量(若不指定,则容量为Integer.MAX_VALUE)。
LinkedBlockingDeque是一个双端队列
ArrayBlockingQueue在构造时需要指定容量,且可以用一个可选参数来指定是否需要公平性。若设置了公平性则等待了最长时间的线程将会优先得到处理,但这样会降低性能。
PriorityBlockingQueue是一个使用堆实现的优先级队列。元素会按照优先级的顺序被取出。
DelayQueue该队列存放实现了Delayed接口的元素,每个元素存在一个延迟,只有当延迟为负时才能从该队列中移除,该队列有序,队头元素的延迟最长,该队列可用于对超时元素做出某些处理的场景,如订单超时自动取消:
interface Delayed extends Comparable<Delayed>{
long getDelay(TimeUnit unit);
}
在Java 7中,增加了一个TransferQueue接口(继承了BlockingQueue),其主要作用是生产者通过transfer方法将一个元素放置到队列中后将被阻塞,直到消费者从队列中取出这个元素。
void transfer(E e) throws InterruptedException;
4.线程安全的集合
由于对某些集合进行存取时并不是一个简单地一步操作,如对散列表添加元素后,散列表会需要内部调整各个桶的关系,这时假如存在其他线程抢占到了处理机对没有调整完的散列表进行操作,就会破坏集合的内部结构产生混乱导致抛出异常或陷入死循环。因此面对这种场景可以选择一些java.util.concurrent包中线程安全的集合类如:concurrentHashMap、ConcurrentSkipListMap、ConcurrentSkipListSet和ConcurrentLinkedQueue。
并发的散列映射可高效的支持大量的读者和一定数量的写者。
对于含有映射比较庞大的集合可以使用mappingCount方法返回其long类型的大小。
集合可以返回弱一致性的迭代器。迭代器在多线程环境下工作,不一定能返回所有对集合进行的修改,但同一个值不会出现两次,也不会抛出ConcurrentModificationException异常。
java.util中的迭代器构造之后集合发生改变会抛出上述异常。
线程安全的集合可以保证集合内部的操作具有原子性,但仍然不能保证使用集合时的线程安全问题,如使用一个映射完成转账操作。对于这种场景需要自行编写原子性的代码,如果对集合的值更新比较简单也可以使用集合提供的一些方法比如compute与merge方法。
4.1 同步包装器
Vector与HashTable是较早提供了线程安全的动态数组与散列表实现。现在对于其替代者ArrayList和HashMap类也可以使用同步包装器变为线程安全的,实际上任何集合类都可以通过同步包装器变为线程安全的。实现原理为为集合的方法使用锁加以保护,并返回包装后的集合。
但是在包装后的多线程环境下,应确保没有任何线程通过原始的非同步方法访问数据结构,也就是说,作为一种引用类型,不要有原始变量引用集合并对集合进行操作。对集合的操作也必须要使用客户端锁定,包括使用迭代器遍历集合时(构造迭代器后修改集合会抛出ConcurrentModificationException异常)。
实际上最好还是使用线程安全的集合而不是同步包装器返回的集合。
5.Callable与Future
Runnable封装一个异步运行的任务,可以将其想象为一个没有参数和返回值的异步方法。Callable与Runnable类似,但是有返回值。
public interface Callable<V>{
V call() throws Exception; //call与run方法类似,都是封装一个任务。类型参数V即返回值的类型
}
获取结果使用Future对象,通过将Callable转换成Future,然后使用Future对象传入Thread构造器中进行计算。之后使用Future的方法返回结果。
转换类型使用FutureTask包装器,它可以将Callable转换成Future与Runnable。
public interface Future
V get() throws..; //获取结果,获取时被阻塞,计算完成后返回结果。
V get(long timeout, TimeUnit unit) throws..; //时间内未返回结果抛出InterruptedException异常,同时运行计算的线程被中断,两个方法都会抛出InterruptedException异常。
void cancel(boolean mayInterrupt); //取消计算,计算未开始进行取消且不会再开始,否则对其进行中断
boolean isCancelled(); //若在计算完成前被取消返回true,否则返回false
boolean isDone(); //计算未完成返回false,计算完成返回true
}
6.执行器与线程池
频繁开辟新线程涉及和操作系统的交互,如果程序中创建了大量生命周期很短的线程应该使用线程池。将Runnable交给线程池,就会有一个线程调用run方法,run方法退出时方法不会死亡,而是在池中准备为下一个请求提供服务。
此外,一个系统采用线程池可以控制并发线程的数目,创建大量线程会大大降低性能甚至使JVM崩溃。
执行器(Executor)有许多静态工厂方法用来构建线程池:
6.1 常用线程池
下面为常用线程池的用法:
public static void main(String[] args) throws ExecutionException, InterruptedException {
//线程池返回一个ExecutorService可以接受任务并为其分配线程
ExecutorService executorService = Executors.newCachedThreadPool(); //对于每个任务,有空闲线程立即让他执行任务,否则创建线程,空闲线程会保留60秒
// Executors.newFixedThreadPool(20); //创建一个固定大小线程池,若任务数多于线程数则进入队列中等待空闲线程
// Executors.newSingleThreadExecutor(); //相当于容量为1的固定大小线程池
//若使用submit提交对象为Runnable则会自动包装为FutureTask对象
Future<?> f1 = executorService.submit(() -> System.out.println("执行任务"));//提交一个Runnable对象返回一个Future对象
f1.get(); //可以用该返回的Future调用isDone、cancel或isCancelled但调用get方法会返回null
Future<Object> f2 = executorService.submit(() -> System.out.println("执行任务"), new Employee(100, "测试")); //返回的Future对象调用get方法会得到传入的Employee对象
System.out.println(f2.get()); //Employee{name=测试 salary=100.0}
Future<Integer> f3 = executorService.submit(() -> {
System.out.println("这是一个call()方法");
return 100;
}); //传入一个Callable对象
System.out.println(f3.get()); //call方法的返回结果 100
//确定不会再使用线程池时需要对其进行关闭
executorService.shutdown(); //不在接收新任务,所有任务都完成后,线程池中的线程死亡。
// executorService.shutdownNow(); //取消尚未开始的所有任务并试图中断正在运行的任务。
}
6.2 预定执行
使用newScheduledThreadPool和newSingleThreadScheduledThreadPool将返回实现了ScheduledExecutorService接口的对象。它是一种允许使用线程池机制的java.util.Timer的泛化,即对预定的Runnable或Callable在初始的延迟之后只运行一次或对Runnable对象周期性的运行。
6.3 控制任务组
批量提交Callable
如果有多个带有返回结果的任务想要批量提交给线程池该怎么做?ExecutorService提供了两个方法invokeAll和invokeAny两个方法,可以接受存储Callable元素的集合,两个方法的不同之处在于:
invokeAny:返回一个最先完成计算的任务结果,例如你利用了多线程寻找迷宫的出口,其实只要最先计算完成的线程告知出口位置就可以。
invokeAll:返回的是全部任务对应的Future组成的List。
ExecutorCompletionService
对于invokeAll方法返回的集合,当遍历它时,可能会因为该Future对应的任务还没有计算完毕而发生阻塞,这样,可能有其它任务计算完了我们也不能获取。这个问题可以用ExecutorCompletionService解决,使用ExecutorCompletionService类包装一下ExecutorService,返回一个ExecutorCompletionService对象。然后使用该对象多次提交任务后,这个类会按结果可获得的顺序保存起来,这样就不用怕存在多个已经计算完毕的结果,而被当前计算时间较长的Future阻塞的情况了。之后使用take方法获取结果。
示例代码如下:
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService executorService = Executors.newCachedThreadPool();
Callable<Integer> c1 = () -> {
System.out.println("执行任务1");
Thread.sleep(1000); //模拟计算时间最长的任务
return 1;
};
Callable<Integer> c2 = () -> {
System.out.println("执行任务2");
return 2;
};
Callable<Integer> c3 = () -> {
System.out.println("执行任务3");
return 3;
};
List<Callable<Integer>> list = Arrays.asList(c1, c2, c3);
//invokeAny
// Integer integer = executorService.invokeAny(list);
// System.out.println(integer); //输出:2 得到计算最快的结果时就停止
//invokeAll
List<Future<Integer>> futures = executorService.invokeAll(list);
futures.iterator().forEachRemaining((Future f) -> {
try {
System.out.println(f.get()); //输出:1 2 3 会顺序获取任务的结果,但任务1执行最慢,因此会被任务1阻塞才能获得其余结果
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
});
//ExecutorCompletionService
ExecutorCompletionService ecs = new ExecutorCompletionService(executorService);
ecs.submit(c1);
ecs.submit(c2);
ecs.submit(c3);
for (int i = 0; i < 3; i++) {
System.out.println(ecs.take().get()); //2 3 1 不会被任务1阻塞
}
executorService.shutdown();
}
6.4 Fork-Join框架
Fork-Join框架是Java 7引入的一个能够并行执行任务的框架,一般出现一个大问题能划分为小问题,小问题和大问题的解决方式一样且小问题的解可以应用回大问题这样的递归式的问题结构时就可以使用该框架并行解决子问题,比如典型的图像处理问题。采用该框架需要扩展RecursiveTask
class Counter extends RecursiveTask<Integer>{
设定阈值 = ..;
参数 = ...;
public Counter(问题参数){
设定当前问题的参数;
}
...
protected Integer compute(){ //必须覆盖该方法,该方法为实际处理问题的逻辑
if(当前子问题规模 < 阈值){
直接解决子问题
}
else{
分解子问题
Counter first = new Counter(子问题1); //递归分解子问题
Counter second = new Counter(子问题2);
...
invokeAll(first, second,..) //处理子问题
return first.join() + second.join() ...; //join方法返回子问题结果
}
}
}
6.5 CompletableFurture
如果你有一个多线程异步处理的逻辑需求,如希望使用多线程的方式从url中获取网页文本,在对这个文本使用多线程的方式获取其中的url,那么就可以使用java 8提供的CompletableFurture类。该类以函数式编程的方式提供了提供了Future处理链,或者组合多个Future完成更复杂的任务,如:
CompletableFurture<String> contents = readPage(url); //CompletableFurture
CompletableFurture<List<URL>> links = contents.thenApply(Parser::getLinks); //使用thenApply,传入一个方法处理contents,然后再返回一个CompletableFurture
6.6 同步器
java.util.concurrent包中包含了几个能够帮助人们管理相互合作的线程集的类,这些机制具有为线程之间的共用集结点模式提供的“预置功能”。如果一个相互合作的线程集满足这些行为模式之一,那么应该直接重用合适的类库而不要手动编写锁与条件的集合代码。
以上是关于Java核心技术读书笔记10-2 阻塞队列线程安全集合类Callable与Future线程池与任务组同步框架的主要内容,如果未能解决你的问题,请参考以下文章
C++并发编程----无锁实现线程安全队列(《C++ Concurrency in Action》 读书笔记)