《Java8实战》读书笔记10:组合式异步编程 CompletableFuture

Posted 笑虾

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了《Java8实战》读书笔记10:组合式异步编程 CompletableFuture相关的知识,希望对你有一定的参考价值。

《Java8实战》读书笔记10:组合式异步编程 CompletableFuture

第11章 CompletableFuture:组合式异步编程

本章内容
 创建异步计算,并获取计算结果
 使用非阻塞操作提升吞吐量
 设计和实现异步API
 如何以异步的方式使用同步的API
 如何对两个或多个异步操作进行流水线和合并操作
 如何处理异步操作的完成状态

  1. 看了一下 本章内容 我感觉到有点闪回的效果,怎么似曾相识呢。。。哦!!!
    1.1 前面的《Java8实战》读书笔记03:Lambda 表达式的组合用法 就有类似的概念。
    1.2. 之前的JS中也学了类似的东西:JavaScript 学习笔记【Promise】async、await
  2. 除非是使用 消息队列 进行异步解耦。否则大多数情况异步任务存在的意义就是别耽误主线程干活(异步任务是被老大派出去办事的小弟),主线程总会有那么一个时间点,堵在校门口让异步任务把结果交出来,供主线程享用。

11.1 Future 接口 (只是个引子)

  1. Future 接口在Java 5中被引入。它代表了一个对将来的结果的引用,并提供了可获取最终结果方法;另一个优点是它比更底层的 Thread 易用。
  2. 从下面代码中可以看到它需要配合 ExecutorService 线程池使用。
  3. Futureget 方法:如果操作已经完成,该方法会立刻返回操作的结果,否则它会阻塞你的线程,直到操作完成,返回相应的结果。它还有一个可以设置超时重载版本。(除非你正在被迫为敌国写代码,否则,请始终设置超时时间)
public static void main(String[] args) 
    //System.out.println("1. 主线程开始...");
    ExecutorService executor = Executors.newCachedThreadPool();
    //System.out.println("2. 提交异步任务!");
    Future<Double> future = executor.submit(new Callable<Double>() 
        public Double call() 
            return doSomeLongComputation();
        );
    doSomethingElse();
    try 
        //System.out.println("4. 主线程忙完了,向异步任务要结果,等到它返回为止。");
        Double result = future.get(2000, TimeUnit.SECONDS);
        //System.out.println("5. 得到异步结果: " + result);
     catch (ExecutionException ee) /* 计算抛出一个异常*/
    catch (InterruptedException ie) /* 当前线程在等待过程中被中断*/
    catch (TimeoutException te) /* 在Future对象完成之前超过已过期*/
    //System.out.println("6. 完事...");
    //executor.shutdown();

// 某耗时运算
private static Double doSomeLongComputation()
    try  Thread.sleep(1000);  catch (InterruptedException e)  e.printStackTrace(); 
    return 0.618D;

// 主线程干其它活
private static void doSomethingElse()
    System.out.println("3. 主线程继续忙别的事件");

11.1.1 Future 接口的局限性

看看本章的标题,也能猜出Future只是个引子,它虽然也提供了一个 boolean isDone() 用来判断异步任务是否完成。但总的来说还是太简陋。

  • 比如我们需要下面这些:
  1. 将两个异步计算合并为一个——这两个异步计算之间相互独立,同时第二个又依赖于第一个的结果。(串连两个异步任务)
  2. 等待Future集合中的所有任务都完成
  3. 仅等待Future集合中最快结束的任务完成(有可能因为它们试图通过不同的方式计算同一个值),并返回它的结果。
  4. 通过编程方式完成一个Future任务的执行(即以手工设定异步操作结果的方式)。
  5. 应对Future完成事件(即当Future的完成事件发生时会收到通知,并能使用Future计算的结果进行下一步的操作,不只是简单地阻塞等待操作的结果)。

接下来,了解新的 CompletableFuture 类(它实现了 Future接口) 如何利用 Java 8 的新特性以更直观的方式将上述需求都变为可能。StreamCompletableFuture 的设计都遵循了类似的模式:它们都使用了 Lambda 表达式以及 流水线 的思想。从这个角度,你可以说 CompletableFutureFuture 的关系就跟 StreamCollection 的关系一样。

11.1.2 使用 CompletableFuture 构建异步应用

本节说后续会做一个“最佳价格查询器”,从中学到不拉不拉(直接看下文即可)略。。。

同步API与异步API
同步API其实只是对传统方法调用的另一种称呼:你调用了某个方法,调用方在被调用方运行的过程中会等待,被调用方运行结束返回,调用方取得被调用方的返回值并继续运行。即使调用方和被调用方在不同的线程中运行,调用方还是需要等待被调用方结束运行,这就是阻塞式调用这个名词的由来。
与此相反,异步API会直接返回,或者至少在被调用方计算完成之前,将它剩余的计算任务交给另一个线程去做,该线程和调用方是异步的——这就是非阻塞式调用的由来。执行剩余计算任务的线程会将它的计算结果返回给调用方。返回的方式:
——要么是通过回调函数
——要么是由调用方再次执行一个“等待,直到计算完成”的方法调用。
这种方式的计算在I/O系统程序设计中非常常见:你发起了一次磁盘访问,这次访问和你的其他计算操作是异步的,你完成其他的任务时,磁盘块的数据可能还没载入到内存,你只需要等待数据的载入完成。

11.2 实现异步 API

11.2.0 同步版本 Shop 类(用来重构的原料)

先准备一份 同步版本 代码,然后再一步一步重构最终得到一个异步非堵塞的版本。

代码清单11-2 模拟1秒钟延迟的方法

public static void delay() 
    try  Thread.sleep(1000L);  
    catch (InterruptedException e)  throw new RuntimeException(e); 


代码清单11-3getPrice方法中引入一个模拟的延迟

// 获取价格
public double getPrice(String product) 
    return calculatePrice(product);

// 计算价格
private double calculatePrice(String product) 
    delay();
    return new Random().nextDouble() * product.charAt(0) + product.charAt(1);

11.2.1 将同步方法转换为异步方法

getPrice 转换为 getPriceAsync 方法。它无法直接返回结果,但可以给你打个白条,等他有结果了,你可以拿着白条找他兑现。Lannister always pay his debts.

代码清单11-4 getPriceAsync方法的实现

// 异步获取价格
public Future<Double> getPriceAsync(String product) 
    CompletableFuture<Double> futurePrice = new CompletableFuture<>(); // 创建“白条”
    new Thread( () -> 
        double price = calculatePrice(product); // 在另一个线程中以异步方式执行计算
        futurePrice.complete(price); // 耗时任务结束,结果装进“白条”
    ).start();
    return futurePrice; // 返回“白条”

代码清单11-5 使用异步API (测试)

@Test
public void testGetPriceAsync() 
    Shop shop = new Shop("螺蛳粉店");
    long start = System.nanoTime();
    // 这里的 futurePrice 不是价格本身,而是一个在将来能取出 “螺蛳粉价格” 的纳戒。
    Future<Double> futurePrice = shop.getPriceAsync("螺蛳粉");
    long invocationTime = ((System.nanoTime() - start) / 1_000_000);
    System.out.println("发起异步调用在 " + invocationTime + " 毫秒后返回主线程。");
    // 主线各执行其它任务,比如
    System.out.println("猛刷外卖APP看还有别的什么东西吃。");
    // 在计算商品价格的同时
    try 
        // 从 Future(纳戒)中取价格,如果价格还没算好,就死等。
        double price = futurePrice.get();
        System.out.printf("价格是: %.2f%n", price);
     catch (Exception e) 
        throw new RuntimeException(e);
    
    long retrievalTime = ((System.nanoTime() - start) / 1_000_000);
    System.out.println( retrievalTime + " 毫秒后返回价格。");

发起异步调用在 39 毫秒后返回主线程。
猛刷外卖APP看还有别的什么东西吃。
价格是: 43386.46
1047 毫秒后返回价格。

11.2.2 错误处理

上面的代码中计算价格如果发生异常,它可是在另一个线程啊,要怎么处理呢?接着看:

  1. 改造 getPriceAsync 把可能出问题的计算价格过程try起来,
  2. 如果有异常,用 futurePrice.completeExceptionally(e) 接住。
  3. 如此客户端 futurePrice.get(); 时就会抛这个异常了。然后就看业务,处理就是了。(本质上就是把异常通过CompletableFuture 转了一手,带到主线程来了。)

代码清单11-6 抛出CompletableFuture内的异常

// 异步获取价格
public Future<Double> getPriceAsync(String product) 
    CompletableFuture<Double> futurePrice = new CompletableFuture<>(); // 创建 CompletableFuture 对象,它会包含计算的结果
    new Thread( () -> 
        try 
            double price = calculatePrice(product); // 在另一个线程中以异步方式执行计算
            futurePrice.complete(price); // 耗时任务结束,结果装进“白条”
         catch (Exception e) 
        	// 设置异常后,再调用  get() 时就会抛它出来。
            futurePrice.completeExceptionally(e); 
        
    ).start();
    return futurePrice; // 返回“白条”

CompletableFuture 静态方法 supplyAsync(最终版)

  • 使用工厂方法supplyAsync创建CompletableFuture

CompletableFuture类自身提供了大量精巧的工厂方法,使用这些方法能更容易地完成整个流程,还不用担心实现的细节。比如,采用supplyAsync方法后,你可以用一行语句重写代码清单11-4中的getPriceAsync方法,如下所示。

代码清单11-7 使用工厂方法supplyAsync创建CompletableFuture对象

public Future<Double> getPriceAsync(String product)  
	return CompletableFuture.supplyAsync(() -> calculatePrice(product)); 

这一句就等价于 代码清单11-6 那一堆。(包括异常处理的部分) 所以后续应用场景,我们肯定是直接上这个版本了。

11.3 让你的代码免受阻塞之苦

11.3.0 顺序流操版

前面实现了单个商店的查询,但是如果有多个商店需要查询,问题就有点不同了。
现在的剧情是:你需要查询 所有商店,但他们都只提供了同步APIgetPrice方法

代码清单11-8 采用顺序查询所有商店的方式实现的findPrices方法 (批量查询)

// 批量查询价格
public static List<String> findPrices(String product) 
    return shops.stream()
            .map(shop -> String.format("%s 价格为: %.2f", 
				            shop.getName(), 
				            shop.getPrice(product)))
            .collect(toList());

代码清单11-9 验证正确性和执行性能

// 测试数据【商家的列表】,如下:
private static List<Shop> shops = Arrays.asList(
        new Shop("柳州螺蛳粉"), new Shop("津市牛肉粉"),
        new Shop("重庆酸辣粉"), new Shop("桂林米粉"));

@Test
public void testFindPrices() 
    long start = System.nanoTime();
    List<String> str = findPrices("酸辣粉");
    long duration = (System.nanoTime() - start) / 1_000_000;
    System.out.println("耗时 " + duration + " 纳秒");

[【柳州螺蛳粉】家【酸辣粉】价格: 72101.30, 【津市牛肉粉】家【酸辣粉】价格: 60069.58, 
【重庆酸辣粉】家【酸辣粉】价格: 72050.75, 【桂林米粉】家【酸辣粉】价格: 61647.51]
耗时 4084 纳秒

11.3.1 使用并行流对请求进行并行操作(并行流版)

代码清单11-8shops.stream()shops.parallelStream() 即可,直接看测试结果:

[【柳州螺蛳粉】家【酸辣粉】价格: 37838.66, 【津市牛肉粉】家【酸辣粉】价格: 65524.66, 
【重庆酸辣粉】家【酸辣粉】价格: 52324.70, 【桂林米粉】家【酸辣粉】价格: 60152.23]
耗时 1064 纳秒

效率有明显提升。

11.3.2 使用 CompletableFuture 发起异步请求(过度版)


代码清单11-11 使用 CompletableFuture 实现 findPrices 方法
注意 CompletableFuture 类中的 join 方法和 Future 接口中的 get 有相同的含义,并且也声明在 Future 接口中,它们唯一的不同是 join不会抛出任何检测到的异常。

public static List<String> findPrices(String product) 
    List<CompletableFuture<String>> priceFutures =
            shops.stream()
                    .map(shop -> CompletableFuture.supplyAsync(
                            () -> String.format("【%s】家【%s】价格: %.2f", shop.getName(), product, shop.getPrice(product))
                        )
                    )
                    .collect(Collectors.toList());
    return priceFutures.stream()
            .map(CompletableFuture::join)
            .collect(toList());

这里可以看到,代码中分成了两段流操作。对应 图11-4 的下半部分(并行执行)。
第一部分是顺序流,如果把第二部分直接接着它map 那就都成顺序流了。所以先把结果future1、future2、future3聚集到一个列表中,再开一个流。这样所有futurejoin()能并行启动,不用排队等前一个结束。

[【柳州螺蛳粉】家【酸辣粉】价格: 37810.56, 【津市牛肉粉】家【酸辣粉】价格: 58265.10, 
【重庆酸辣粉】家【酸辣粉】价格: 64577.53, 【桂林米粉】家【酸辣粉】价格: 51406.55]
耗时 1071 纳秒

这里效果还不如直接上面直接用并行流的方案。然后引出后续剧情。

11.3.3 寻找更好的方案

并行流CompletableFuture内部默认用的都是通用线程池,固定数目的线程,具体线程数取决于 Runtime.getRuntime().availableProcessors() (Java虚拟机可用的处理器数量 ) 的返回值。

然而,CompletableFuture 允许你对执行器(Executor)进行配置,尤其是线程池的大小,让它以更适合应用需求的方式进行配置,满足程序的要求,而这是并行流API无法提供的。

11.3.4 使用定制的执行器(最终版)

调整线程池的大小
《Java并发编程实战》一书中,Brian Goetz和合著者们为线程池大小的优化提供了不少中肯的建议。这非常重要,如果线程池中线程的数量过多,最终它们会竞争稀缺的处理器和内存资源,浪费大量的时间在上下文切换上。反之,如果线程的数目过少,正如你的应用所面临的情况,处理器的一些核可能就无法充分利用。Brian Goetz建议,线程池大小与处理器的利用率之比可以使用下面的公式进行估算:
Nthreads = NCPU * UCPU * (1 + W/C)
其中:
NCPU是处理器的核的数目,可以通过 Runtime.getRuntime().availableProcessors() 得到
UCPU是期望的CPU利用率(该值应该介于0和1之间)
W/C是等待时间与计算时间的比率

简化方案:线程数 = 商店数,再加个上限保底100:Math.min(shops.size(), 100)

自定义执行器 Executor


代码清单11-12 为“最优价格查询器”应用定制的执行器

private final Executor executor = Executors.newFixedThreadPool(
		Math.min(shops.size(), 100),
       	new ThreadFactory() 
        	public Thread newThread(Runnable r) 
				Thread t = new Thread(r);
				t.setDaemon(true); // 使用守护线程——这种方式不会阻止程序的关停
				return t;
			
       
   );

还记得用 lambda 替换 匿名类 吗? lambda 版:

private static final Executor executor = Executors.newFixedThreadPool(
        Math.min(shops.size(), 100),
        (r) ->
            Thread t = new Thread(r);
            t.setDaemon(true); // 使用守护线程——这种方式不会阻止程序的关停
            return t;
        
);

果将线程标记为守护进程,意味着程序退出时它也会被回收。

使用自定义执行器

代码清单11-11CompletableFuture.supplyAsync 加上第二个参数 executor,即可:

CompletableFuture.supplyAsync(
	() -> String.format("【%s】家【%s】价格: %.2f", shop.getName(), product, shop.getPrice(product)),
	executor
)

测试性能

[【柳州螺蛳粉0】家【酸辣粉】价格: 44212.36, 【柳州螺蛳粉1】家【酸辣粉】价格: 60541.35, 
【柳州螺蛳粉2】家【酸辣粉】价格: 62474.67, 【柳州螺蛳粉3】家【酸辣粉】价格: 56118.18, 
【柳州螺蛳粉4】家【酸辣粉】价格: 53683.30, 【柳州螺蛳粉5】家【酸辣粉】价格: 44924.01, 
【柳州螺蛳粉6】家【酸辣粉】价格: 67151.76, 【柳州螺蛳粉7】家【酸辣粉】价格: 59344.12, 
【柳州螺蛳粉8】家【酸辣粉】价格: 44457.99, 【柳州螺蛳粉9】家【酸辣粉】价格: 67508.70, 
【津市牛肉粉1】家【酸辣粉】价格: 54769.93, 【津市牛肉粉2】家【酸辣粉】价格: 63197.59, 
【重庆酸辣粉1】家【酸辣粉】价格: 73413.08, 【重庆酸辣粉2】家【酸辣粉】价格: 38340.71, 
【桂林米粉】家【酸辣粉】价格: 57547.14]
耗时 1064 纳秒

这里把商店添加到了 15 家。耗时反而最少。性能满意
一般而言,这种状态会一直持续,直到商店的数目达到我们之前计算的阈值(程池大小)。

并行——使用流还是CompletableFutures?
目前为止,你已经知道对集合进行并行计算两种方式:
: 要么将其转化为并行流,利用map这样的操作开展工作,
: 要么枚举出集合中的每一个元素,创建新的线程,在 CompletableFuture 内对其进行操作。后者提供了更多的灵活性,你可以调整线程池的大小,而这能帮助你确保整体的计算不会因为线程都在等待I/O而发生阻塞。

我们对使用这些API的建议如下

❑ 如果你进行的是计算密集型的操作,并且没有I/O,那么推荐使用Stream接口,因为实现简单,同时效率也可能是最高的(如果所有的线程都是计算密集型的,那就没有必要创建比处理器核数更多的线程)。
❑ 反之,如果你并行的工作单元还涉及等待I/O的操作(包括网络连接等待),那么使用 CompletableFuture 灵活性更好,你可以像前文讨论的那样,依据等待/计算,或者W/C的比率设定需要使用的线程数。这种情况不使用并行流的另一个原因是,处理流的流水线中如果发生I/O等待,流的延迟特性会让我们很难判断到底什么时候触发了等待。

11.4 对多个异步任务进行流水线操作 (实现串连并连等)

剧情:现在除了查价格还要查折扣,然后再算出实付多少。
首先准备一个同步版,然后再慢慢重构。

11.4.0-1 准备工作(同步版)

现在我们需要添加几个类一起来配合,最终:(代码稍微整理了一下, 完全照抄)
Quote:价格解析类【新增】
Discount:价格解析类【新增】
Shop:这就是前面的商店类。
ShopTest:测试类。

实现价格解析类 Quote

封装一个 Quote 它用静态方法 parse 解析 getPrice 返回的字符串结果:商品名称:价格:折扣码。得到三个值后,new一个 Quote 实例返回。
原文较长,我这里顺便用 lombok 简化了一下。

@Getter
<

以上是关于《Java8实战》读书笔记10:组合式异步编程 CompletableFuture的主要内容,如果未能解决你的问题,请参考以下文章

《Java8实战》 - 读书笔记 - Lambda 表达式的组合用法

《Java8实战》读书笔记12:函数式编程

《Java8实战》读书笔记12:函数式编程

《Java8实战》读书笔记13:Java8 与 Scala

《Java8实战》读书笔记13:Java8 与 Scala

《Java8实战》读书笔记09:用 Optional 处理值为 null 的情况