Java——聊聊JUC中的CompletableFuture
Posted 宋宋_浩浩_Java工程师
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java——聊聊JUC中的CompletableFuture相关的知识,希望对你有一定的参考价值。
文章目录:
2.1 runAsync(Runnable runnable)
2.2 runAsync(Runnable runnable, Executor executor)
2.3 supplyAsync(Supplier supplier)
2.4 supplyAsync(Supplier supplier, Executor executor)
4.电商小案例——简单应用CompletableFuture
5.4 CompletableFuture和线程池(thenRun、thenRunAsync)
1.承接Future和FutureTask
在上一篇文章中和大家分享了Future和FutureTask,也在文末指明了这二者的缺点,链接:Java——聊聊JUC中的Future和FutureTask
由此引出了异步编程界的大佬 CompletableFuture。
由图可知,二者在都实现了 Future 接口,而 CompletableFuture 又实现了 CompletionStage 这个接口,强大之处就在这里。
CompletableFuture 提供了非常强大的针对Future的扩展功能,可以帮助我们简化异步编程的复杂性,并且提供了函数式编程的能力,可以通过回调方式处理计算结果,也提供了转换和组合 CompletableFuture 的一系列方法,它可以代表一个明确完成的 Future,也可以代表一个完成阶段 CompletionStage。
2.CompletableFuture四大静态方法
我们现在都知道了 CompletableFuture 是一个类,学Java的最清楚了,给我个类,老子就 new,你别说别的,老子就是new,new就完事了,但是翻了翻jdk官方文档,打脸了。。。
他告诉我们对于 CompletableFuture 的无参构造,其实是创建了一个不完整的 CompletableFuture,那还new个屁啊。 所以这就需要它提供的四大静态方法来获取 CompletableFuture 对象了。(实际开发中常用的是第二组),对于这两组,我们都可以单独的传入 Runnable 接口或者 Supplier 供给型接口,也可以结合线程池传入。如果没有传入线程池参数,那么将使用默认的 ForkJoinPool.commonPool() ;如果指定了线程池参数,将以我们自定义的为主。
2.1 runAsync(Runnable runnable)
package com.szh.demo;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
public class CompletableFutureDemo1
public static void main(String[] args) throws ExecutionException, InterruptedException
CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() ->
System.out.println(Thread.currentThread().getName());
try
TimeUnit.SECONDS.sleep(1);
catch (InterruptedException e)
e.printStackTrace();
);
System.out.println(completableFuture.get());
可以看到,这是没有返回值、没有指定线程池的静态方法,所以使用默认的 ForkJoinPool.commonPool,返回值因为没有所以就是 null。
2.2 runAsync(Runnable runnable, Executor executor)
package com.szh.demo;
import java.util.concurrent.*;
public class CompletableFutureDemo2
public static void main(String[] args) throws ExecutionException, InterruptedException
ExecutorService threadPool = Executors.newFixedThreadPool(3);
CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() ->
System.out.println(Thread.currentThread().getName());
try
TimeUnit.SECONDS.sleep(1);
catch (InterruptedException e)
e.printStackTrace();
, threadPool);
System.out.println(completableFuture.get());
threadPool.shutdown();
这里,我们自定义了线程池,所以就使用我们自己定义的了。
2.3 supplyAsync(Supplier<U> supplier)
package com.szh.demo;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
public class CompletableFutureDemo3
public static void main(String[] args) throws ExecutionException, InterruptedException
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() ->
System.out.println(Thread.currentThread().getName());
try
TimeUnit.SECONDS.sleep(1);
catch (InterruptedException e)
e.printStackTrace();
return "hello supplyAsync";
);
System.out.println(completableFuture.get());
这是有返回值的一组,不指定线程池就使用默认的,get可以正常获取到异步线程的执行结果。
2.4 supplyAsync(Supplier<U> supplier, Executor executor)
package com.szh.demo;
import java.util.concurrent.*;
public class CompletableFutureDemo4
public static void main(String[] args) throws ExecutionException, InterruptedException
ExecutorService threadPool = Executors.newFixedThreadPool(3);
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() ->
System.out.println(Thread.currentThread().getName());
try
TimeUnit.SECONDS.sleep(1);
catch (InterruptedException e)
e.printStackTrace();
return "hello supplyAsync";
, threadPool);
System.out.println(completableFuture.get());
threadPool.shutdown();
这里,我们自定义了线程池,所以就使用我们自己定义的了。 get可以正常获取到异步线程的执行结果。
3.CompletableFuture减少阻塞和轮询
package com.szh.demo;
import java.util.Objects;
import java.util.concurrent.*;
public class CompletableFutureDemo5
public static void main(String[] args)
ExecutorService threadPool = Executors.newFixedThreadPool(1);
try
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() ->
System.out.println(Thread.currentThread().getName() + " ---- come in");
int result = ThreadLocalRandom.current().nextInt(10);
try
TimeUnit.SECONDS.sleep(3);
catch (InterruptedException e)
e.printStackTrace();
// if (result > 2)
// int ans = 10 / 0;
//
System.out.println("出结果了:" + result);
return result;
, threadPool).whenComplete((v, e) ->
if (Objects.isNull(e))
System.out.println("计算完成:" + v);
).exceptionally(e ->
e.printStackTrace();
System.out.println("异常情况:" + e.getMessage());
return -1;
);
System.out.println(Thread.currentThread().getName() + " 线程在忙其他事情....");
catch (Exception e)
e.printStackTrace();
finally
threadPool.shutdown();
这里的异步线程计算结果需要3秒,而main线程在忙自己的业务,当异步线程计算完毕之后,会自动回调 whenComplete 方法。
将代码中注释的部分打开,出现异常之后,当异步线程计算结果超过2之后,就会发生异常。
总结:
- 异步线程执行任务结束时,会自动回调某个对象的方法。(上面的案例是 whenComplete )
- 主线程设置好回调之后,不再关心异步线程的任务执行的究竟怎样,异步任务之间可以顺序执行。
- 异步线程执行任务出异常时,会自动回调某个对象的方法。(上面的案例是 exceptionally )
4.电商小案例——简单应用CompletableFuture
对于同一款产品,同时搜索出本产品在各大电商平台的售价,案例中的产品就拿 mysql 书籍为例。
解决方案:
- step by step:一步一步执行,按部就班,先查京东,再查当当,最后查淘宝。
- all in:异步线程执行,万箭齐发,京东、当当、淘宝同时多任务同时查询。
package com.szh.demo;
import lombok.Getter;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@Getter
class NetMail
private String netMailName;
public NetMail(String netMailName)
this.netMailName = netMailName;
//根据商品名称模拟一个随机商品价格,睡了一秒模拟在电商网站中搜索的耗时
//这里偷懒,价格这块就使用double简单点,不再使用BigDecimal了
public double calculatePrice(String productName)
try
TimeUnit.SECONDS.sleep(1);
catch (InterruptedException e)
e.printStackTrace();
return ThreadLocalRandom.current().nextDouble() * 2 + productName.charAt(0);
public class CompletableFutureDemo6
static List<NetMail> netMailList = Arrays.asList(
new NetMail("jd"),
new NetMail("dangdang"),
new NetMail("taobao"),
new NetMail("tmail"),
new NetMail("pdd")
);
//step by step,一家一家搜索
public static List<String> getPrice(List<NetMail> netMailList, String productName)
return netMailList.stream()
.map(netMail -> String.format(productName + " in %s price is %.2f", netMail.getNetMailName(), netMail.calculatePrice(productName)))
.collect(Collectors.toList());
//CompletableFuture,万箭齐发同时搜索
//List<NetMail> ---> List<CompletableFuture<String>> ---> List<String>
public static List<String> getPriceByCompletableFuture(List<NetMail> netMailList, String productName)
return netMailList.stream()
//开启异步多线程模式同时搜索,将每一个搜索任务都映射成一个CompletableFuture异步任务
.map(netMail -> CompletableFuture.supplyAsync(() -> String.format(productName + " in %s price is %.2f", netMail.getNetMailName(), netMail.calculatePrice(productName))))
.collect(Collectors.toList())
.stream()
//将异步多线程的执行结果再次映射成新的List流
//join方法和get方法是一样的,获取到异步线程的执行结果,区别是join方法不会产生异常
.map(CompletableFuture::join)
.collect(Collectors.toList());
public static void main(String[] args)
//方式一耗时
long startTime1 = System.currentTimeMillis();
List<String> list1 = getPrice(netMailList, "Redis");
list1.forEach(System.out::println);
long endTime1 = System.currentTimeMillis();
System.out.println("step by step原始方式耗时:" + (endTime1 - startTime1) + " ms");
System.out.println("---------------------------------------");
//方式二耗时
long startTime2 = System.currentTimeMillis();
List<String> list2 = getPriceByCompletableFuture(netMailList, "Redis");
list2.forEach(System.out::println);
long endTime2 = System.currentTimeMillis();
System.out.println("CompletableFuture方式耗时:" + (endTime2 - startTime2) + " ms");
step by step 就是一个一个搜索,代码中模拟搜索一次就是1秒,所以五家电商平台就搜索了5次,大概5秒。
all in 就是开启异步多线程,在五家电商平台同时搜索,所以整体就是这1秒完事。
5.CompletableFuture常用API
5.1 获得结果和触发计算
package com.szh.demo;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class CompletableFutureDemo7
public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() ->
try
TimeUnit.SECONDS.sleep(2);
catch (InterruptedException e)
e.printStackTrace();
return "abc";
);
//等待拿到结果之后再走人
System.out.println(completableFuture.get());
//异步线程需要2秒才可以计算完成,主线程这边最多等待1秒,过期不候
//System.out.println(completableFuture.get(1, TimeUnit.SECONDS));
//和get()方法一样,只是join不会抛出异常
//System.out.println(completableFuture.join());
//如果主线程这边立刻获取的时候,异步线程还没有计算完成,则返回getNow中设定的备胎值
//打开下面的try-catch语句,主线程会等待3秒之后再去获取异步线程的计算结果,而异步线程只需要2秒就可以计算完成,所以主线程可以拿到异步线程的计算结果
// try
// TimeUnit.SECONDS.sleep(3);
// catch (InterruptedException e)
// e.printStackTrace();
//
// System.out.println(completableFuture.getNow("xyz"));
//根据主线程、异步线程的执行时间来决定是否打断异步线程的计算过程,直接返回complete设定的值
//异步线程计算耗时2秒,主线程等待3秒,则不会打断,正常拿到异步线程的计算结果,false
//异步线程计算耗时2秒,主线程等待1秒,则会打断,此时直接输出主线程complete设定的值,true
try
TimeUnit.SECONDS.sleep(1);
catch (InterruptedException e)
e.printStackTrace();
//System.out.println(completableFuture.complete("rst") + " " + completableFuture.join());
public T get() 方法执行结果如下:↓↓↓
public T get(long timeout, TimeUnit unit) 打开注释,执行结果如下:↓↓↓
public T join() 打开注释,执行结果如下:↓↓↓
public T getNow(T valueIfAbsent) 打开注释,执行结果如下:↓↓↓ (详情参考上面代码中的注释,写的很详细了)
public boolean complete(T value) 打开注释,执行结果如下:↓↓↓(详情参考上面代码中的注释,写的很详细了)
5.2 对计算结果进行处理
这几个API的计算结果存在依赖关系,多个线程串行化。第一个任务执行完成后,执行第二个回调方法任务,会将第一个任务的执行结果,作为第二个任务的入参,传递到回调方法中。
下面的代码是针对 handle 方法走的,它和 thenApply 的区别主要在遇到异常的时候。
handle如果遇到异常,那么仅仅是异常所在的这一步出错,而之后的会继续正常执行。(有点类似于 try-catch-finally)
thenApply如果遇到异常,后续的就都不会再执行了。(有点类似于 try-catch)
package com.szh.demo;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class CompletableFutureDemo8
public static void main(String[] args)
ExecutorService threadPool = Executors.newFixedThreadPool(2);
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() ->
try
TimeUnit.SECONDS.sleep(1);
catch (InterruptedException e)
e.printStackTrace();
System.out.println("111");
return 1;
, threadPool).handle((f, e) ->
//int x = 10 / 0;
System.out.println("222");
return f + 2;
).handle((f, e) ->
System.out.println("333");
return f + 3;
).whenComplete((v, e) ->
if (Objects.isNull(e))
System.out.println("计算结果:" + v);
).exceptionally(e ->
e.printStackTrace();
System.out.println(e.getMessage());
return -1;
);
System.out.println(Thread.currentThread().getName() + " 在忙其他任务");
threadPool.shutdown();
出现异常之后
将上述代码中的 handle 方法换成 thenApply之后(二者的函数式接口不一致,所以代码做如下修改),在没有异常情况下,和 handle 执行结果是一样的,就不再截图了。 如果出现异常了,thenApply是下图这种情况:↓↓↓
.thenApply(f ->
int x = 10 / 0;
System.out.println("222");
return f + 2;
).thenApply(f ->
System.out.println("333");
return f + 3;
)
5.3 对计算结果进行消费
package com.szh.demo;
import java.util.concurrent.CompletableFuture;
/**
* thenRun: 任务A执行完再执行任务B,并且B不需要A的结果
* thenAccept: 任务A执行完再执行任务B,并且B需要A的结果,但是B无返回值
* thenApply: 任务A执行完再执行任务B,并且B需要A的结果,同时B有返回值
*/
public class CompletableFutureDemo9
public static void main(String[] args)
System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenRun(() -> ).join());
System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenAccept(r -> System.out.println(r)).join());
System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenApply(r -> r + "resultB").join());
5.4 CompletableFuture和线程池(thenRun、thenRunAsync)
如果没有传入自定义线程池,那么大家都用默认的 ForkJoinPool。
如果你执行第一个任务的时候,传入了一个自定义线程池:
- 调用thenRun方法执行第二个任务时,则第二个任务和第一个任务是共用同一个线程池。
- 调用thenRunAsync方法执行第二个任务时,则第一个任务使用的是你自己传入的线程池,第二个任务使用的是ForkJoin线程池。
这样的方法有好多组,thenAccept和thenAcceptAsync,thenApply和thenApplyAsync等,它们之间的区别也是这个。
package com.szh.demo;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class CompletableFutureDemo10
public static void main(String[] args)
ExecutorService threadPool = Executors.newFixedThreadPool(3);
try
CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(() ->
try
TimeUnit.MILLISECONDS.sleep(20);
catch (InterruptedException e)
e.printStackTrace();
System.out.println("1号任务" + "\\t" + Thread.currentThread().getName());
return "abcd";
, threadPool).thenRun(() ->
try
TimeUnit.MILLISECONDS.sleep(20);
catch (InterruptedException e)
e.printStackTrace();
System.out.println("2号任务" + "\\t" + Thread.currentThread().getName());
).thenRun(() ->
try
TimeUnit.MILLISECONDS.sleep(10);
catch (InterruptedException e)
e.printStackTrace();
System.out.println("3号任务" + "\\t" + Thread.currentThread().getName());
).thenRun(() ->
try
TimeUnit.MILLISECONDS.sleep(10);
catch (InterruptedException e)
e.printStackTrace();
System.out.println("4号任务" + "\\t" + Thread.currentThread().getName());
);
System.out.println(completableFuture.get(2L, TimeUnit.SECONDS));
catch (Exception e)
e.printStackTrace();
finally
threadPool.shutdown();
全部都是 thenRun,则会使用我们自定义的线程池。当然了如果全部都是thenRunAsync,也一样使用的是我们自定义的线程池。
将代码中的第一个 thenRun 改成 thenRunAsync,就不一样了。
在源码中是可以看到的,如果你调用的 thenRunAsync,那么他就会给你用默认的线程池。
下面那个布尔方法表示CPU的内核数是否大于1,现在的电脑基本都满足了,所以可以理解为恒定的true。
5.5 对计算速度进行选用
public <U> CompletableFuture<U> applyToEither( CompletionStage<? extends T> other, Function<? super T, U> fn)
package com.szh.demo;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
public class CompletableFutureDemo11
public static void main(String[] args)
CompletableFuture<String> playA = CompletableFuture.supplyAsync(() ->
System.out.println("A come in");
try
TimeUnit.SECONDS.sleep(2);
catch (InterruptedException e)
e.printStackTrace();
return "playA";
);
CompletableFuture<String> playB = CompletableFuture.supplyAsync(() ->
System.out.println("B come in");
try
TimeUnit.SECONDS.sleep(3);
catch (InterruptedException e)
e.printStackTrace();
return "playB";
);
CompletableFuture<String> completableFuture = playA.applyToEither(playB, f -> f + " is winner....");
System.out.println(completableFuture.join());
5.6 对计算结果进行合并
public <U,V> CompletableFuture<V> thenCombine( CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
package com.szh.demo;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
public class CompletableFutureDemo12
public static void main(String[] args)
CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() ->
System.out.println(Thread.currentThread().getName() + " 启动....");
try
TimeUnit.SECONDS.sleep(2);
catch (InterruptedException e)
e.printStackTrace();
return 10;
);
CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() ->
System.out.println(Thread.currentThread().getName() + " 启动....");
try
TimeUnit.SECONDS.sleep(1);
catch (InterruptedException e)
e.printStackTrace();
return 20;
);
CompletableFuture<Integer> result = completableFuture1.thenCombine(completableFuture2, (x, y) ->
System.out.println("开始对两个结果进行合并....");
return x * y;
);
System.out.println(result.join());
第二种写法如下
package com.szh.demo;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
public class CompletableFutureDemo12
public static void main(String[] args)
CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() ->
System.out.println(Thread.currentThread().getName() + " 启动....");
try
TimeUnit.SECONDS.sleep(2);
catch (InterruptedException e)
e.printStackTrace();
return 10;
).thenCombine(CompletableFuture.supplyAsync(() ->
System.out.println(Thread.currentThread().getName() + " 启动....");
try
TimeUnit.SECONDS.sleep(1);
catch (InterruptedException e)
e.printStackTrace();
return 20;
), (x, y) ->
System.out.println("开始对两个结果进行合并....");
return x * y;
);
System.out.println(completableFuture1.join());
以上是关于Java——聊聊JUC中的CompletableFuture的主要内容,如果未能解决你的问题,请参考以下文章