Java——聊聊JUC中的CompletableFuture

Posted 宋子浩

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java——聊聊JUC中的CompletableFuture相关的知识,希望对你有一定的参考价值。

文章目录:

1.承接Future和FutureTask

2.CompletableFuture四大静态方法

2.1 runAsync(Runnable runnable)

2.2 runAsync(Runnable runnable, Executor executor)

2.3 supplyAsync(Supplier supplier)

2.4 supplyAsync(Supplier supplier, Executor executor)

3.CompletableFuture减少阻塞和轮询

4.电商小案例——简单应用CompletableFuture

5.CompletableFuture常用API

5.1 获得结果和触发计算

5.2 对计算结果进行处理

5.3 对计算结果进行消费

5.4 CompletableFuture和线程池(thenRun、thenRunAsync)

5.5 对计算速度进行选用

5.6 对计算结果进行合并


1.承接Future和FutureTask

在上一篇文章中和大家分享了Future和FutureTask,也在文末指明了这二者的缺点,链接:Java——聊聊JUC中的Future和FutureTask

由此引出了异步编程界的大佬 CompletableFuture。

 

由图可知,二者在都实现了 Future 接口,而 CompletableFuture 又实现了 CompletionStage 这个接口,强大之处就在这里。 

CompletableFuture 提供了非常强大的针对Future的扩展功能,可以帮助我们简化异步编程的复杂性,并且提供了函数式编程的能力,可以通过回调方式处理计算结果,也提供了转换和组合 CompletableFuture 的一系列方法,它可以代表一个明确完成的 Future,也可以代表一个完成阶段 CompletionStage。


2.CompletableFuture四大静态方法

我们现在都知道了 CompletableFuture 是一个类,学Java的最清楚了,给我个类,老子就 new,你别说别的,老子就是new,new就完事了,但是翻了翻jdk官方文档,打脸了。。。

他告诉我们对于 CompletableFuture 的无参构造,其实是创建了一个不完整的 CompletableFuture,那还new个屁啊。     所以这就需要它提供的四大静态方法来获取 CompletableFuture 对象了。(实际开发中常用的是第二组),对于这两组,我们都可以单独的传入 Runnable 接口或者 Supplier 供给型接口,也可以结合线程池传入。如果没有传入线程池参数,那么将使用默认的  ForkJoinPool.commonPool()  ;如果指定了线程池参数,将以我们自定义的为主。

2.1 runAsync(Runnable runnable)

package com.szh.demo;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

public class CompletableFutureDemo1 
    public static void main(String[] args) throws ExecutionException, InterruptedException 
        CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> 
            System.out.println(Thread.currentThread().getName());
            try 
                TimeUnit.SECONDS.sleep(1);
             catch (InterruptedException e) 
                e.printStackTrace();
            
        );
        System.out.println(completableFuture.get());
    

可以看到,这是没有返回值、没有指定线程池的静态方法,所以使用默认的 ForkJoinPool.commonPool,返回值因为没有所以就是 null。 

2.2 runAsync(Runnable runnable, Executor executor)

package com.szh.demo;

import java.util.concurrent.*;

public class CompletableFutureDemo2 
    public static void main(String[] args) throws ExecutionException, InterruptedException 
        ExecutorService threadPool = Executors.newFixedThreadPool(3);

        CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> 
            System.out.println(Thread.currentThread().getName());
            try 
                TimeUnit.SECONDS.sleep(1);
             catch (InterruptedException e) 
                e.printStackTrace();
            
        , threadPool);
        System.out.println(completableFuture.get());

        threadPool.shutdown();
    

这里,我们自定义了线程池,所以就使用我们自己定义的了。 

2.3 supplyAsync(Supplier<U> supplier)

package com.szh.demo;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

public class CompletableFutureDemo3 
    public static void main(String[] args) throws ExecutionException, InterruptedException 
        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> 
            System.out.println(Thread.currentThread().getName());
            try 
                TimeUnit.SECONDS.sleep(1);
             catch (InterruptedException e) 
                e.printStackTrace();
            
            return "hello supplyAsync";
        );
        System.out.println(completableFuture.get());
    

这是有返回值的一组,不指定线程池就使用默认的,get可以正常获取到异步线程的执行结果。

2.4 supplyAsync(Supplier<U> supplier, Executor executor)

package com.szh.demo;

import java.util.concurrent.*;

public class CompletableFutureDemo4 
    public static void main(String[] args) throws ExecutionException, InterruptedException 
        ExecutorService threadPool = Executors.newFixedThreadPool(3);

        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> 
            System.out.println(Thread.currentThread().getName());
            try 
                TimeUnit.SECONDS.sleep(1);
             catch (InterruptedException e) 
                e.printStackTrace();
            
            return "hello supplyAsync";
        , threadPool);
        System.out.println(completableFuture.get());

        threadPool.shutdown();
    

这里,我们自定义了线程池,所以就使用我们自己定义的了。  get可以正常获取到异步线程的执行结果。


3.CompletableFuture减少阻塞和轮询

package com.szh.demo;

import java.util.Objects;
import java.util.concurrent.*;

public class CompletableFutureDemo5 
    public static void main(String[] args) 
        ExecutorService threadPool = Executors.newFixedThreadPool(1);
        try 
            CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> 
                System.out.println(Thread.currentThread().getName() + " ---- come in");
                int result = ThreadLocalRandom.current().nextInt(10);
                try 
                    TimeUnit.SECONDS.sleep(3);
                 catch (InterruptedException e) 
                    e.printStackTrace();
                
//                if (result > 2) 
//                    int ans = 10 / 0;
//                
                System.out.println("出结果了:" + result);
                return result;
            , threadPool).whenComplete((v, e) -> 
                if (Objects.isNull(e)) 
                    System.out.println("计算完成:" + v);
                
            ).exceptionally(e -> 
                e.printStackTrace();
                System.out.println("异常情况:" + e.getMessage());
                return -1;
            );
            System.out.println(Thread.currentThread().getName() + " 线程在忙其他事情....");
         catch (Exception e) 
            e.printStackTrace();
         finally 
            threadPool.shutdown();
        
    

这里的异步线程计算结果需要3秒,而main线程在忙自己的业务,当异步线程计算完毕之后,会自动回调 whenComplete 方法。 

将代码中注释的部分打开,出现异常之后,当异步线程计算结果超过2之后,就会发生异常。

总结:

  1. 异步线程执行任务结束时,会自动回调某个对象的方法。(上面的案例是 whenComplete )
  2. 主线程设置好回调之后,不再关心异步线程的任务执行的究竟怎样,异步任务之间可以顺序执行。
  3. 异步线程执行任务出异常时,会自动回调某个对象的方法。(上面的案例是 exceptionally )

4.电商小案例——简单应用CompletableFuture

对于同一款产品,同时搜索出本产品在各大电商平台的售价,案例中的产品就拿 mysql 书籍为例。

解决方案:

  1. step by step:一步一步执行,按部就班,先查京东,再查当当,最后查淘宝。
  2. all in:异步线程执行,万箭齐发,京东、当当、淘宝同时多任务同时查询。
package com.szh.demo;

import lombok.Getter;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

@Getter
class NetMail 
    private String netMailName;

    public NetMail(String netMailName) 
        this.netMailName = netMailName;
    

    //根据商品名称模拟一个随机商品价格,睡了一秒模拟在电商网站中搜索的耗时
    //这里偷懒,价格这块就使用double简单点,不再使用BigDecimal了
    public double calculatePrice(String productName) 
        try 
            TimeUnit.SECONDS.sleep(1);
         catch (InterruptedException e) 
            e.printStackTrace();
        
        return ThreadLocalRandom.current().nextDouble() * 2 + productName.charAt(0);
    


public class CompletableFutureDemo6 
    static List<NetMail> netMailList = Arrays.asList(
            new NetMail("jd"),
            new NetMail("dangdang"),
            new NetMail("taobao"),
            new NetMail("tmail"),
            new NetMail("pdd")
    );

    //step by step,一家一家搜索
    public static List<String> getPrice(List<NetMail> netMailList, String productName) 
        return netMailList.stream()
                .map(netMail -> String.format(productName + " in %s price is %.2f", netMail.getNetMailName(), netMail.calculatePrice(productName)))
                .collect(Collectors.toList());
    

    //CompletableFuture,万箭齐发同时搜索
    //List<NetMail> ---> List<CompletableFuture<String>> ---> List<String>
    public static List<String> getPriceByCompletableFuture(List<NetMail> netMailList, String productName) 
        return netMailList.stream()
                //开启异步多线程模式同时搜索,将每一个搜索任务都映射成一个CompletableFuture异步任务
                .map(netMail -> CompletableFuture.supplyAsync(() -> String.format(productName + " in %s price is %.2f", netMail.getNetMailName(), netMail.calculatePrice(productName))))
                .collect(Collectors.toList())
                .stream()
                //将异步多线程的执行结果再次映射成新的List流
                //join方法和get方法是一样的,获取到异步线程的执行结果,区别是join方法不会产生异常
                .map(CompletableFuture::join)
                .collect(Collectors.toList());
    

    public static void main(String[] args) 
        //方式一耗时
        long startTime1 = System.currentTimeMillis();
        List<String> list1 = getPrice(netMailList, "Redis");
        list1.forEach(System.out::println);
        long endTime1 = System.currentTimeMillis();
        System.out.println("step by step原始方式耗时:" + (endTime1 - startTime1) + " ms");

        System.out.println("---------------------------------------");

        //方式二耗时
        long startTime2 = System.currentTimeMillis();
        List<String> list2 = getPriceByCompletableFuture(netMailList, "Redis");
        list2.forEach(System.out::println);
        long endTime2 = System.currentTimeMillis();
        System.out.println("CompletableFuture方式耗时:" + (endTime2 - startTime2) + " ms");
    

step by step 就是一个一个搜索,代码中模拟搜索一次就是1秒,所以五家电商平台就搜索了5次,大概5秒。

all in 就是开启异步多线程,在五家电商平台同时搜索,所以整体就是这1秒完事。

 


5.CompletableFuture常用API

5.1 获得结果和触发计算

package com.szh.demo;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class CompletableFutureDemo7 
    public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException 
        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> 
            try 
                TimeUnit.SECONDS.sleep(2);
             catch (InterruptedException e) 
                e.printStackTrace();
            
            return "abc";
        );
        //等待拿到结果之后再走人
        System.out.println(completableFuture.get());

        //异步线程需要2秒才可以计算完成,主线程这边最多等待1秒,过期不候
        //System.out.println(completableFuture.get(1, TimeUnit.SECONDS));

        //和get()方法一样,只是join不会抛出异常
        //System.out.println(completableFuture.join());

        //如果主线程这边立刻获取的时候,异步线程还没有计算完成,则返回getNow中设定的备胎值
        //打开下面的try-catch语句,主线程会等待3秒之后再去获取异步线程的计算结果,而异步线程只需要2秒就可以计算完成,所以主线程可以拿到异步线程的计算结果
//        try 
//            TimeUnit.SECONDS.sleep(3);
//         catch (InterruptedException e) 
//            e.printStackTrace();
//        
//        System.out.println(completableFuture.getNow("xyz"));

         //根据主线程、异步线程的执行时间来决定是否打断异步线程的计算过程,直接返回complete设定的值
         //异步线程计算耗时2秒,主线程等待3秒,则不会打断,正常拿到异步线程的计算结果,false
         //异步线程计算耗时2秒,主线程等待1秒,则会打断,此时直接输出主线程complete设定的值,true
         try 
             TimeUnit.SECONDS.sleep(1);
          catch (InterruptedException e) 
             e.printStackTrace();
         
         //System.out.println(completableFuture.complete("rst") + " " + completableFuture.join());
    

public T get() 方法执行结果如下:↓↓↓

public T get(long timeout, TimeUnit unit) 打开注释,执行结果如下:↓↓↓

public T join()  打开注释,执行结果如下:↓↓↓

public T getNow(T valueIfAbsent)  打开注释,执行结果如下:↓↓↓ (详情参考上面代码中的注释,写的很详细了)

public boolean complete(T value)  打开注释,执行结果如下:↓↓↓(详情参考上面代码中的注释,写的很详细了)

5.2 对计算结果进行处理

这几个API的计算结果存在依赖关系,多个线程串行化。第一个任务执行完成后,执行第二个回调方法任务,会将第一个任务的执行结果,作为第二个任务的入参,传递到回调方法中。

下面的代码是针对 handle 方法走的,它和 thenApply 的区别主要在遇到异常的时候。

handle如果遇到异常,那么仅仅是异常所在的这一步出错,而之后的会继续正常执行。(有点类似于 try-catch-finally)

thenApply如果遇到异常,后续的就都不会再执行了。(有点类似于 try-catch)

package com.szh.demo;

import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class CompletableFutureDemo8 
    public static void main(String[] args) 
        ExecutorService threadPool = Executors.newFixedThreadPool(2);

        CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> 
            try 
                TimeUnit.SECONDS.sleep(1);
             catch (InterruptedException e) 
                e.printStackTrace();
            
            System.out.println("111");
            return 1;
        , threadPool).handle((f, e) -> 
            //int x = 10 / 0;
            System.out.println("222");
            return f + 2;
        ).handle((f, e) -> 
            System.out.println("333");
            return f + 3;
        ).whenComplete((v, e) -> 
            if (Objects.isNull(e)) 
                System.out.println("计算结果:" + v);
            
        ).exceptionally(e -> 
            e.printStackTrace();
            System.out.println(e.getMessage());
            return -1;
        );

        System.out.println(Thread.currentThread().getName() + " 在忙其他任务");

        threadPool.shutdown();
    

出现异常之后

将上述代码中的 handle 方法换成 thenApply之后(二者的函数式接口不一致,所以代码做如下修改),在没有异常情况下,和 handle 执行结果是一样的,就不再截图了。  如果出现异常了,thenApply是下图这种情况:↓↓↓ 

        .thenApply(f -> 
            int x = 10 / 0;
            System.out.println("222");
            return f + 2;
        ).thenApply(f -> 
            System.out.println("333");
            return f + 3;
        )

5.3 对计算结果进行消费

package com.szh.demo;

import java.util.concurrent.CompletableFuture;

/**
 * thenRun: 任务A执行完再执行任务B,并且B不需要A的结果
 * thenAccept: 任务A执行完再执行任务B,并且B需要A的结果,但是B无返回值
 * thenApply: 任务A执行完再执行任务B,并且B需要A的结果,同时B有返回值
 */
public class CompletableFutureDemo9 
    public static void main(String[] args) 
        System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenRun(() -> ).join());
        System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenAccept(r -> System.out.println(r)).join());
        System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenApply(r -> r + "resultB").join());
    

5.4 CompletableFuture和线程池(thenRunthenRunAsync

如果没有传入自定义线程池,那么大家都用默认的 ForkJoinPool。

如果你执行第一个任务的时候,传入了一个自定义线程池:

  • 调用thenRun方法执行第二个任务时,则第二个任务和第一个任务是共用同一个线程池。
  • 调用thenRunAsync方法执行第二个任务时,则第一个任务使用的是你自己传入的线程池,第二个任务使用的是ForkJoin线程池。

这样的方法有好多组,thenAccept和thenAcceptAsync,thenApply和thenApplyAsync等,它们之间的区别也是这个。

package com.szh.demo;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class CompletableFutureDemo10 
    public static void main(String[] args) 
        ExecutorService threadPool = Executors.newFixedThreadPool(3);

        try 
            CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(() -> 
                try 
                    TimeUnit.MILLISECONDS.sleep(20);
                 catch (InterruptedException e) 
                    e.printStackTrace();
                
                System.out.println("1号任务" + "\\t" + Thread.currentThread().getName());
                return "abcd";
            , threadPool).thenRun(() -> 
                try 
                    TimeUnit.MILLISECONDS.sleep(20);
                 catch (InterruptedException e) 
                    e.printStackTrace();
                
                System.out.println("2号任务" + "\\t" + Thread.currentThread().getName());
            ).thenRun(() -> 
                try 
                    TimeUnit.MILLISECONDS.sleep(10);
                 catch (InterruptedException e) 
                    e.printStackTrace();
                
                System.out.println("3号任务" + "\\t" + Thread.currentThread().getName());
            ).thenRun(() -> 
                try 
                    TimeUnit.MILLISECONDS.sleep(10);
                 catch (InterruptedException e) 
                    e.printStackTrace();
                
                System.out.println("4号任务" + "\\t" + Thread.currentThread().getName());
            );
            System.out.println(completableFuture.get(2L, TimeUnit.SECONDS));
         catch (Exception e) 
            e.printStackTrace();
         finally 
            threadPool.shutdown();
        
    

全部都是  thenRun,则会使用我们自定义的线程池。当然了如果全部都是thenRunAsync,也一样使用的是我们自定义的线程池。

将代码中的第一个 thenRun 改成 thenRunAsync,就不一样了。

在源码中是可以看到的,如果你调用的 thenRunAsync,那么他就会给你用默认的线程池。

下面那个布尔方法表示CPU的内核数是否大于1,现在的电脑基本都满足了,所以可以理解为恒定的true。

5.5 对计算速度进行选用

public <U> CompletableFuture<U> applyToEither(
         CompletionStage<? extends T> other, Function<? super T, U> fn)
package com.szh.demo;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

public class CompletableFutureDemo11 
    public static void main(String[] args) 
        CompletableFuture<String> playA = CompletableFuture.supplyAsync(() -> 
            System.out.println("A come in");
            try 
                TimeUnit.SECONDS.sleep(2);
             catch (InterruptedException e) 
                e.printStackTrace();
            
            return "playA";
        );

        CompletableFuture<String> playB = CompletableFuture.supplyAsync(() -> 
            System.out.println("B come in");
            try 
                TimeUnit.SECONDS.sleep(3);
             catch (InterruptedException e) 
                e.printStackTrace();
            
            return "playB";
        );

        CompletableFuture<String> completableFuture = playA.applyToEither(playB, f -> f + " is winner....");
        System.out.println(completableFuture.join());
    

5.6 对计算结果进行合并

public <U,V> CompletableFuture<V> thenCombine(
    CompletionStage<? extends U> other,
    BiFunction<? super T,? super U,? extends V> fn)

package com.szh.demo;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

public class CompletableFutureDemo12 
    public static void main(String[] args) 
        CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> 
            System.out.println(Thread.currentThread().getName() + " 启动....");
            try 
                TimeUnit.SECONDS.sleep(2);
             catch (InterruptedException e) 
                e.printStackTrace();
            
            return 10;
        );

        CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> 
            System.out.println(Thread.currentThread().getName() + " 启动....");
            try 
                TimeUnit.SECONDS.sleep(1);
             catch (InterruptedException e) 
                e.printStackTrace();
            
            return 20;
        );

        CompletableFuture<Integer> result = completableFuture1.thenCombine(completableFuture2, (x, y) -> 
            System.out.println("开始对两个结果进行合并....");
            return x * y;
        );
        System.out.println(result.join());
    

第二种写法如下

package com.szh.demo;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

public class CompletableFutureDemo12 
    public static void main(String[] args) 
        CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> 
            System.out.println(Thread.currentThread().getName() + " 启动....");
            try 
                TimeUnit.SECONDS.sleep(2);
             catch (InterruptedException e) 
                e.printStackTrace();
            
            return 10;
        ).thenCombine(CompletableFuture.supplyAsync(() -> 
            System.out.println(Thread.currentThread().getName() + " 启动....");
            try 
                TimeUnit.SECONDS.sleep(1);
             catch (InterruptedException e) 
                e.printStackTrace();
            
            return 20;
        ), (x, y) -> 
            System.out.println("开始对两个结果进行合并....");
            return x * y;
        );
        System.out.println(completableFuture1.join());
    

以上是关于Java——聊聊JUC中的CompletableFuture的主要内容,如果未能解决你的问题,请参考以下文章

Java——聊聊JUC中的CompletableFuture

Java——聊聊JUC中的原子变量类

Java——聊聊JUC中的原子变量类

Java——聊聊JUC中的ThreadLocal

Java——聊聊JUC中的ThreadLocal

Java——聊聊JUC中的Java内存模型(JMM)