《Java8实战》读书笔记10:组合式异步编程 CompletableFuture
Posted 笑虾
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了《Java8实战》读书笔记10:组合式异步编程 CompletableFuture相关的知识,希望对你有一定的参考价值。
《Java8实战》读书笔记10:组合式异步编程 CompletableFuture
- 第11章 CompletableFuture:组合式异步编程
- 11.1 Future 接口 (只是个引子)
- 11.2 实现异步 API
- 11.3 让你的代码免受阻塞之苦
- 11.4 对多个异步任务进行流水线操作 (实现串连并连等)
- 11.5 响应 CompletableFuture 的 completion 事件(回调函数)
- 11.6 小结
- CompletableFuture 部分方法介绍
- 参考资料
第11章 CompletableFuture:组合式异步编程
本章内容
创建异步计算,并获取计算结果
使用非阻塞操作提升吞吐量
设计和实现异步API
如何以异步的方式使用同步的API
如何对两个或多个异步操作进行流水线和合并操作
如何处理异步操作的完成状态
- 看了一下
本章内容
我感觉到有点闪回的效果,怎么似曾相识呢。。。哦!!!
1.1 前面的《Java8实战》读书笔记03:Lambda 表达式的组合用法 就有类似的概念。
1.2. 之前的JS
中也学了类似的东西:JavaScript 学习笔记【Promise】async、await - 除非是使用
消息队列
进行异步解耦。否则大多数情况异步任务
存在的意义
就是别耽误主线程
干活(异步任务是被老大派出去办事的小弟),主线程总会有那么一个时间点,堵在校门口让异步任务
把结果交出来,供主线程享用。
11.1 Future 接口 (只是个引子)
- Future 接口在
Java 5
中被引入。它代表了一个对将来的结果
的引用,并提供了可获取最终结果
的方法
;另一个优点是它比更底层的 Thread 易用。 - 从下面代码中可以看到它需要配合 ExecutorService 线程池使用。
Future
的 get 方法:如果操作已经完成,该方法会立刻返回操作的结果,否则它会阻塞你的线程,直到操作完成,返回相应的结果。它还有一个可以设置超时
重载版本。(除非你正在被迫为敌国写代码,否则,请始终设置超时时间)
public static void main(String[] args)
//System.out.println("1. 主线程开始...");
ExecutorService executor = Executors.newCachedThreadPool();
//System.out.println("2. 提交异步任务!");
Future<Double> future = executor.submit(new Callable<Double>()
public Double call()
return doSomeLongComputation();
);
doSomethingElse();
try
//System.out.println("4. 主线程忙完了,向异步任务要结果,等到它返回为止。");
Double result = future.get(2000, TimeUnit.SECONDS);
//System.out.println("5. 得到异步结果: " + result);
catch (ExecutionException ee) /* 计算抛出一个异常*/
catch (InterruptedException ie) /* 当前线程在等待过程中被中断*/
catch (TimeoutException te) /* 在Future对象完成之前超过已过期*/
//System.out.println("6. 完事...");
//executor.shutdown();
// 某耗时运算
private static Double doSomeLongComputation()
try Thread.sleep(1000); catch (InterruptedException e) e.printStackTrace();
return 0.618D;
// 主线程干其它活
private static void doSomethingElse()
System.out.println("3. 主线程继续忙别的事件");
11.1.1 Future 接口的局限性
看看本章的标题,也能猜出Future
只是个引子,它虽然也提供了一个 boolean isDone() 用来判断异步任务是否完成。但总的来说还是太简陋。
- 比如我们需要下面这些:
- 将两个异步计算合并为一个——这两个异步计算之间相互独立,同时第二个又依赖于第一个的结果。(
串连两个异步任务
) - 等待
Future
集合中的所有
任务都完成
。 - 仅等待
Future
集合中最快结束的任务完成(有可能因为它们试图通过不同的方式计算同一个值),并返回它的结果。 - 通过编程方式完成一个
Future
任务的执行(即以手工设定异步操作结果的方式)。 - 应对
Future
的完成事件
(即当Future的完成事件发生时会收到通知,并能使用Future计算的结果进行下一步的操作,不只是简单地阻塞等待操作的结果)。
接下来,了解新的 CompletableFuture 类(它实现了 Future接口) 如何利用 Java 8 的新特性以更直观的方式将上述需求都变为可能。Stream
和 CompletableFuture
的设计都遵循了类似的模式:它们都使用了 Lambda
表达式以及 流水线
的思想。从这个角度,你可以说 CompletableFuture
和 Future
的关系就跟 Stream
和 Collection
的关系一样。
11.1.2 使用 CompletableFuture 构建异步应用
本节说后续会做一个“最佳价格查询器”,从中学到不拉不拉(直接看下文即可)略。。。
同步API与异步API
同步API
其实只是对传统方法调用的另一种称呼:你调用了某个方法,调用方在被调用方运行的过程中会等待,被调用方运行结束返回,调用方取得被调用方的返回值并继续运行。即使调用方和被调用方在不同的线程中运行,调用方还是需要等待被调用方结束运行,这就是阻塞式调用
这个名词的由来。
与此相反,异步API
会直接返回,或者至少在被调用方计算完成之前,将它剩余的计算任务交给另一个线程去做,该线程和调用方是异步的——这就是非阻塞式调用
的由来。执行剩余计算任务的线程会将它的计算结果返回给调用方。返回的方式:
——要么是通过回调函数,
——要么是由调用方再次执行一个“等待,直到计算完成”的方法调用。
这种方式的计算在I/O系统程序设计中非常常见:你发起了一次磁盘访问,这次访问和你的其他计算操作是异步的,你完成其他的任务时,磁盘块的数据可能还没载入到内存,你只需要等待数据的载入完成。
11.2 实现异步 API
11.2.0 同步版本 Shop 类(用来重构的原料)
先准备一份 同步版本
代码,然后再一步一步重构最终得到一个异步非堵塞
的版本。
代码清单11-2 模拟1秒钟延迟的方法
public static void delay()
try Thread.sleep(1000L);
catch (InterruptedException e) throw new RuntimeException(e);
代码清单11-3 在getPrice
方法中引入一个模拟的延迟
// 获取价格
public double getPrice(String product)
return calculatePrice(product);
// 计算价格
private double calculatePrice(String product)
delay();
return new Random().nextDouble() * product.charAt(0) + product.charAt(1);
11.2.1 将同步方法转换为异步方法
将 getPrice
转换为 getPriceAsync
方法。它无法直接返回结果,但可以给你打个白条
,等他有结果了,你可以拿着白条
找他兑现
。Lannister always pay his debts.
代码清单11-4 getPriceAsync方法的实现
// 异步获取价格
public Future<Double> getPriceAsync(String product)
CompletableFuture<Double> futurePrice = new CompletableFuture<>(); // 创建“白条”
new Thread( () ->
double price = calculatePrice(product); // 在另一个线程中以异步方式执行计算
futurePrice.complete(price); // 耗时任务结束,结果装进“白条”
).start();
return futurePrice; // 返回“白条”
代码清单11-5 使用异步API (测试)
@Test
public void testGetPriceAsync()
Shop shop = new Shop("螺蛳粉店");
long start = System.nanoTime();
// 这里的 futurePrice 不是价格本身,而是一个在将来能取出 “螺蛳粉价格” 的纳戒。
Future<Double> futurePrice = shop.getPriceAsync("螺蛳粉");
long invocationTime = ((System.nanoTime() - start) / 1_000_000);
System.out.println("发起异步调用在 " + invocationTime + " 毫秒后返回主线程。");
// 主线各执行其它任务,比如
System.out.println("猛刷外卖APP看还有别的什么东西吃。");
// 在计算商品价格的同时
try
// 从 Future(纳戒)中取价格,如果价格还没算好,就死等。
double price = futurePrice.get();
System.out.printf("价格是: %.2f%n", price);
catch (Exception e)
throw new RuntimeException(e);
long retrievalTime = ((System.nanoTime() - start) / 1_000_000);
System.out.println( retrievalTime + " 毫秒后返回价格。");
发起异步调用在 39 毫秒后返回主线程。
猛刷外卖APP看还有别的什么东西吃。
价格是: 43386.46
1047 毫秒后返回价格。
11.2.2 错误处理
上面的代码中计算价格
如果发生异常
,它可是在另一个线程
啊,要怎么处理呢?接着看:
- 改造
getPriceAsync
把可能出问题的计算价格
过程try
起来, - 如果有异常,用
futurePrice.completeExceptionally(e)
接住。 - 如此客户端
futurePrice.get();
时就会抛这个异常了。然后就看业务,处理就是了。(本质上就是把异常
通过CompletableFuture
转了一手,带到主线程来了。)
代码清单11-6 抛出CompletableFuture内的异常
// 异步获取价格
public Future<Double> getPriceAsync(String product)
CompletableFuture<Double> futurePrice = new CompletableFuture<>(); // 创建 CompletableFuture 对象,它会包含计算的结果
new Thread( () ->
try
double price = calculatePrice(product); // 在另一个线程中以异步方式执行计算
futurePrice.complete(price); // 耗时任务结束,结果装进“白条”
catch (Exception e)
// 设置异常后,再调用 get() 时就会抛它出来。
futurePrice.completeExceptionally(e);
).start();
return futurePrice; // 返回“白条”
CompletableFuture 静态方法 supplyAsync(最终版)
- 使用工厂方法supplyAsync创建CompletableFuture
CompletableFuture
类自身提供了大量精巧的工厂方法,使用这些方法能更容易地完成整个流程,还不用担心实现的细节。比如,采用supplyAsync
方法后,你可以用一行语句重写代码清单11-4中的getPriceAsync
方法,如下所示。
代码清单11-7 使用工厂方法supplyAsync
创建CompletableFuture
对象
public Future<Double> getPriceAsync(String product)
return CompletableFuture.supplyAsync(() -> calculatePrice(product));
这一句就等价于 代码清单11-6 那一堆。(包括异常处理的部分) 所以后续应用场景,我们肯定是直接上这个版本了。
11.3 让你的代码免受阻塞之苦
11.3.0 顺序流操版
前面实现了单个商店的查询,但是如果有多个商店需要查询,问题就有点不同了。
现在的剧情是:你需要查询 所有商店
,但他们都只提供了同步API
(getPrice方法)
代码清单11-8 采用顺序查询所有商店的方式实现的findPrices
方法 (批量查询)
// 批量查询价格
public static List<String> findPrices(String product)
return shops.stream()
.map(shop -> String.format("%s 价格为: %.2f",
shop.getName(),
shop.getPrice(product)))
.collect(toList());
代码清单11-9 验证正确性和执行性能
// 测试数据【商家的列表】,如下:
private static List<Shop> shops = Arrays.asList(
new Shop("柳州螺蛳粉"), new Shop("津市牛肉粉"),
new Shop("重庆酸辣粉"), new Shop("桂林米粉"));
@Test
public void testFindPrices()
long start = System.nanoTime();
List<String> str = findPrices("酸辣粉");
long duration = (System.nanoTime() - start) / 1_000_000;
System.out.println("耗时 " + duration + " 纳秒");
[【柳州螺蛳粉】家【酸辣粉】价格: 72101.30, 【津市牛肉粉】家【酸辣粉】价格: 60069.58,
【重庆酸辣粉】家【酸辣粉】价格: 72050.75, 【桂林米粉】家【酸辣粉】价格: 61647.51]
耗时 4084 纳秒
11.3.1 使用并行流对请求进行并行操作(并行流版)
代码清单11-8 中 shops.stream()
换 shops.parallelStream()
即可,直接看测试结果:
[【柳州螺蛳粉】家【酸辣粉】价格: 37838.66, 【津市牛肉粉】家【酸辣粉】价格: 65524.66,
【重庆酸辣粉】家【酸辣粉】价格: 52324.70, 【桂林米粉】家【酸辣粉】价格: 60152.23]
耗时 1064 纳秒
效率有明显提升。
11.3.2 使用 CompletableFuture 发起异步请求(过度版)
代码清单11-11 使用 CompletableFuture 实现 findPrices 方法
注意 CompletableFuture 类中的 join 方法和 Future
接口中的 get
有相同的含义,并且也声明在 Future
接口中,它们唯一的不同是 join
不会抛出任何检测到的异常。
public static List<String> findPrices(String product)
List<CompletableFuture<String>> priceFutures =
shops.stream()
.map(shop -> CompletableFuture.supplyAsync(
() -> String.format("【%s】家【%s】价格: %.2f", shop.getName(), product, shop.getPrice(product))
)
)
.collect(Collectors.toList());
return priceFutures.stream()
.map(CompletableFuture::join)
.collect(toList());
这里可以看到,代码中分成了两段
流操作。对应 图11-4 的下半部分(并行执行)。
第一部分
是顺序流,如果把第二部分
直接接着它map
那就都成顺序流了。所以先把结果future1、future2、future3
聚集到一个列表中,再开一个流。这样所有future
的join()
能并行启动,不用排队等前一个结束。
[【柳州螺蛳粉】家【酸辣粉】价格: 37810.56, 【津市牛肉粉】家【酸辣粉】价格: 58265.10,
【重庆酸辣粉】家【酸辣粉】价格: 64577.53, 【桂林米粉】家【酸辣粉】价格: 51406.55]
耗时 1071 纳秒
这里效果还不如直接上面直接用并行流
的方案。然后引出后续剧情。
11.3.3 寻找更好的方案
并行流
与CompletableFuture
内部默认
用的都是通用线程池
,固定数目的线程,具体线程数取决于 Runtime.getRuntime().availableProcessors() (Java虚拟机可用的处理器数量 ) 的返回值。
然而,CompletableFuture
允许你对执行器(Executor)进行配置,尤其是线程池的大小,让它以更适合应用需求的方式进行配置,满足程序的要求,而这是并行流API无法提供的。
11.3.4 使用定制的执行器(最终版)
调整线程池的大小
《Java并发编程实战》一书中,Brian Goetz和合著者们为线程池大小的优化提供了不少中肯的建议。这非常重要,如果线程池中线程的数量过多,最终它们会竞争稀缺的处理器和内存资源,浪费大量的时间在上下文切换上。反之,如果线程的数目过少,正如你的应用所面临的情况,处理器的一些核可能就无法充分利用。Brian Goetz建议,线程池大小与处理器的利用率之比可以使用下面的公式进行估算:
Nthreads
=NCPU
*UCPU
* (1
+W/C
)
其中:
❑NCPU
是处理器的核的数目,可以通过 Runtime.getRuntime().availableProcessors() 得到
❑UCPU
是期望的CPU利用率(该值应该介于0和1之间)
❑W/C
是等待时间与计算时间的比率
简化方案:线程数 = 商店数
,再加个上限保底100:Math.min(shops.size(), 100)
自定义执行器 Executor
private final Executor executor = Executors.newFixedThreadPool(
Math.min(shops.size(), 100),
new ThreadFactory()
public Thread newThread(Runnable r)
Thread t = new Thread(r);
t.setDaemon(true); // 使用守护线程——这种方式不会阻止程序的关停
return t;
);
还记得用 lambda
替换 匿名类
吗? lambda 版:
private static final Executor executor = Executors.newFixedThreadPool(
Math.min(shops.size(), 100),
(r) ->
Thread t = new Thread(r);
t.setDaemon(true); // 使用守护线程——这种方式不会阻止程序的关停
return t;
);
果将线程标记为守护进程,意味着程序退出时它也会被回收。
使用自定义执行器
代码清单11-11 中 CompletableFuture.supplyAsync
加上第二个参数 executor
,即可:
CompletableFuture.supplyAsync(
() -> String.format("【%s】家【%s】价格: %.2f", shop.getName(), product, shop.getPrice(product)),
executor
)
测试性能
[【柳州螺蛳粉0】家【酸辣粉】价格: 44212.36, 【柳州螺蛳粉1】家【酸辣粉】价格: 60541.35,
【柳州螺蛳粉2】家【酸辣粉】价格: 62474.67, 【柳州螺蛳粉3】家【酸辣粉】价格: 56118.18,
【柳州螺蛳粉4】家【酸辣粉】价格: 53683.30, 【柳州螺蛳粉5】家【酸辣粉】价格: 44924.01,
【柳州螺蛳粉6】家【酸辣粉】价格: 67151.76, 【柳州螺蛳粉7】家【酸辣粉】价格: 59344.12,
【柳州螺蛳粉8】家【酸辣粉】价格: 44457.99, 【柳州螺蛳粉9】家【酸辣粉】价格: 67508.70,
【津市牛肉粉1】家【酸辣粉】价格: 54769.93, 【津市牛肉粉2】家【酸辣粉】价格: 63197.59,
【重庆酸辣粉1】家【酸辣粉】价格: 73413.08, 【重庆酸辣粉2】家【酸辣粉】价格: 38340.71,
【桂林米粉】家【酸辣粉】价格: 57547.14]
耗时 1064 纳秒
这里把商店添加到了 15
家。耗时反而最少。性能满意
一般而言,这种状态会一直持续,直到商店的数目达到我们之前计算的阈值(程池大小)。
并行——使用流还是CompletableFutures?
目前为止,你已经知道对集合进行并行计算
有两种
方式:
: 要么将其转化为并行流
,利用map这样的操作开展工作,
: 要么枚举出集合中的每一个元素,创建新的线程,在 CompletableFuture 内对其进行操作。后者提供了更多的灵活性,你可以调整线程池的大小,而这能帮助你确保整体的计算不会因为线程都在等待I/O而发生阻塞。
我们对使用这些API的建议如下
❑ 如果你进行的是
计算密集
型的操作,并且没有I/O,那么推荐使用Stream
接口,因为实现简单,同时效率也可能是最高的(如果所有的线程都是计算密集型的,那就没有必要创建比处理器核数更多的线程)。
❑ 反之,如果你并行的工作单元还涉及等待I/O
的操作(包括网络连接等待),那么使用CompletableFuture
灵活性更好,你可以像前文讨论的那样,依据等待/计算,或者W/C的比率设定需要使用的线程数。这种情况不使用并行流的另一个原因是,处理流的流水线中如果发生I/O等待,流的延迟特性会让我们很难判断到底什么时候触发了等待。
11.4 对多个异步任务进行流水线操作 (实现串连并连等)
剧情:现在除了查价格
还要查折扣
,然后再算出实付
多少。
首先准备一个同步版,然后再慢慢重构。
11.4.0-1 准备工作(同步版)
现在我们需要添加几个类一起来配合,最终:(代码稍微整理了一下, 完全照抄)
Quote
:价格解析类【新增】
Discount
:价格解析类【新增】
Shop
:这就是前面的商店类。
ShopTest
:测试类。
实现价格解析类 Quote
封装一个 Quote
它用静态方法 parse
解析 getPrice
返回的字符串结果:商品名称:价格:折扣码
。得到三个值后,new
一个 Quote
实例返回。
原文较长,我这里顺便用 lombok
简化了一下。
@Getter
<以上是关于《Java8实战》读书笔记10:组合式异步编程 CompletableFuture的主要内容,如果未能解决你的问题,请参考以下文章