JUC并发编程之CompletableFuture基础用法
Posted 知道什么是码怪吗?
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了JUC并发编程之CompletableFuture基础用法相关的知识,希望对你有一定的参考价值。
目录
实现多线程的四种方式
方式一:继承Thread类
public static class Thread01 extends Thread
@Override
public void run()
执行任务方式:
Thread thread = new Thread01();
thread.start();
方式二:实现Runnable接口
public static class Runable01 implements Runnable
@Override
public void run()
执行任务方式:
Runable01 runable01 = new Runable01();
new Thread(runable01).start();
方式三:实现Callable接口
public static class Callable01 implements Callable<Object>
@Override
public Integer call() throws Exception
return 0;
执行任务方式:
FutureTask<Integer> futureTask = new FutureTask<>(new Callable01());
new Thread(futureTask).start();// 执行任务
System.out.println(futureTask.get());// 得到返回值
方式四:线程池
前三种方式执行的过程相当于每次新任务都会新增一个线程,这是十分消耗内存的,为了统一管理线程,引入了线程池。执行任务的线程从线程池中拿。
查看线程池构造方法的源码,其七个构造参数分别表示的含义如下:
参数名 | 参数类型 | 参数含义 |
corePoolSize | int | 核心线程数,即线程池中一直保持的线程数量。 |
maximumPoolSize | int | 允许的最大线程数 |
keepAliveTime | long | 线程数大于核心线程数时,线程在该时间下没有收到任务就会自动释放。 |
unit | TimeUnit | 时间单位 |
workQueue | BlockingQueue<Runnable> | 阻塞队列,存储等待执行的任务 |
threadFactory | ThreadFactory | 线程工厂,创造线程 |
handler | RejectedExecutionHandler | 拒绝策略 |
ThreadPoolExecutor executor = new ThreadPoolExecutor(
20,// 核心线程数
200,// 最大线程数
10,// 线程关闭时间
TimeUnit.SECONDS,// 时间:秒
new LinkedBlockingDeque<>(100000),// 存储的异步任务最大数
Executors.defaultThreadFactory(),// 线程工厂
new ThreadPoolExecutor.AbortPolicy());//拒绝策略
接下来为了方便测试,我们开启一个固定10个线程的线程池。
ExecutorService executor = Executors.newFixedThreadPool(10);// 开启一个固定10个线程的线程池
创建异步对象
CompletableFuture 提供了四个静态方法来创建一个异步操作。分别为:
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
这四种方法的区别在于:
方法名 | 区别 |
supplyAsync(Supplier<U> supplier) | 可以返回结果 |
runAsync(Runnable runnable) | 不能返回结果 |
supplyAsync(Supplier<U> supplier, Executor executor) | 可以返回结果,可指定线程池 |
runAsync(Runnable runnable, Executor executor) | 不能返回结果,可指定线程池 |
CompletableFuture.runAsync(() ->
System.out.println("runAsync方法,当前线程:" + Thread.currentThread().getId());
, executor);
CompletableFuture.supplyAsync(() ->
System.out.println("supplyAsync方法,当前线程:" + Thread.currentThread().getId());
return "这是结果";
, executor);
回调方法
public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor)
public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn)
方法以Async结尾,表示此任务可能被其他线程执行,并且也可以指定线程池。如果不以Async结尾,则是继续用当前的线程执行任务。
方法名 | 区别 |
whenComplete | 继续以当前线程执行本次任务 |
whenCompleteAsync | 将当前任务提交给线程池处理 |
whenCompleteAsync方法
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() ->
System.out.println("supplyAsync方法,当前线程:" + Thread.currentThread().getId());
int i = 10 / 0;// 制造数学异常
return i;
, executor).whenCompleteAsync((res, excption) -> // 只能感知异常,无法处理异常
System.out.println("res: " + res + "excption: " + excption);
, executor).exceptionally(throwable -> // 处理异常
return 10;// 返回默认值
);
System.out.println("结果为:" + future.get());// 获取返回值
whenComplete方法
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() ->
System.out.println("supplyAsync方法,当前线程:" + Thread.currentThread().getId());
int i = 10 / 0;// 制造数学异常
return i;
, executor).whenComplete((res, excption) -> // 只能感知异常,无法处理异常
System.out.println("res: " + res + "excption: " + excption);
).exceptionally(throwable -> // 处理异常
return 10;// 返回默认值
);
System.out.println("结果为:" + future.get());
执行结果为:
handle方法
public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn)
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn)
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor)
方法以Async结尾,表示此任务可能被其他线程执行,并且也可以指定线程池。如果不以Async结尾,则是继续用当前的线程执行任务。
该方法可对结果做最后的处理,可以处理异常并且有返回值。
下面这段代码,如果无异常,返回500,有异常返回0。
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() ->
System.out.println("supplyAsync方法,当前线程:" + Thread.currentThread().getId());
int i = 10 / 2 ;
return i;
, executor).handle((res, thr) -> // 感知异常并处理异常
System.out.println("res: " + res + " thr: " + thr);
if (res != null) return res * 100;// 如果结果不为空
if (thr != null) return 0;// 如果异常不为空
return 100;
);
System.out.println(future.get());
正常执行结果:
异常执行结果:
线程串行化
主要就是以下9个方法,以Async结尾的表示可以被其他线程执行,而参数中有Executor的表示可以指定线程池。
三种方式的区别:
方法名 | 区别 |
thenApply | 获取上一个任务的结果,并且返回当前任务的结果 |
thenAccept | 获取上一个任务的结果,无返回值 |
thenRun | 无法获取上一个任务的结果,无返回值 |
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() ->
System.out.println("supplyAsync方法,当前线程:" + Thread.currentThread().getId());
int i = 10 / 2;
return i;
, executor).thenApplyAsync((res) -> // 前一个任务处理完成之后,后续处理
System.out.println("任务二启动");
return res * 10;
, executor);
System.out.println(future.get());
执行结果:
任务组合
其意思就是将两个任务组合在一起,当两个任务都执行完成之后在执行给定的任务。
其方法名和参数值至此以及无需赘述了。
方法名 | 区别 |
thenCombine | 获取两个任务的返回结果,并且返回当前任务的返回值 |
thenAcceptBoth | 获取两个任务的返回结果,并且无返回值 |
runAfterBoth | 不获取两个任务的结果,并且无返回值 |
下面编写了两个任务
CompletableFuture<Integer> future01 = CompletableFuture.supplyAsync(() ->
System.out.println("任务一开始:" + Thread.currentThread().getId());
System.out.println("任务一结束");
return 10;
, executor);
CompletableFuture<Integer> future02 = CompletableFuture.supplyAsync(() ->
System.out.println("任务二开始:" + Thread.currentThread().getId());
System.out.println("任务二结束");
return 123;
, executor);
runAfterAsync方法
future01.runAfterBothAsync(future02, () -> // 无法获取任务的返回值,也不返回结果
System.out.println("开始执行,无返回值也无参数");
, executor);
执行结果
thenAcceptBothAsync方法
future01.thenAcceptBothAsync(future02, (f1, f2) -> // 获取两个任务的返回值,不返回结果
System.out.println("两个任务结果之和为:" + f1 * f2);
, executor);
执行结果
thenCombineAsync方法
CompletableFuture<String> future = future01.thenCombineAsync(future02, (f1, f2) -> // 获取两个任务的返回值,并返回结果
System.out.println("两个任务结果之和为:" + f1 * f2);
return "hello world";
, executor);
System.out.println("thenCombineAsync返回值为: "+future.get());
执行结果
组合任务单任务完成及执行
其意思就是将两个任务组合在一起,只要有一个任务完成了就执行给定的任务。
方法名 | 区别 |
applyToEither | 获取先完成任务的结果,有返回值 |
acceptEither | 获取先完成任务的结果,无返回值 |
runAfterEither | 不获取任务的结果,无返回值 |
下面编写了两个任务,其中任务一睡眠500ms
CompletableFuture<Integer> future01 = CompletableFuture.supplyAsync(() ->
System.out.println("任务一开始:" + Thread.currentThread().getId());
try
Thread.sleep(500);// 线程睡眠500ms
catch (InterruptedException e)
e.printStackTrace();
System.out.println("任务一结束");
return 10;
, executor);
CompletableFuture<Integer> future02 = CompletableFuture.supplyAsync(() ->
System.out.println("任务二开始:" + Thread.currentThread().getId());
System.out.println("任务二结束");
return 123;
, executor);
runAfterEither方法
future01.runAfterEitherAsync(future02, () ->
System.out.println("runAfterEitherAsync开始执行");
, executor);
执行结果
acceptEitherAsync方法
future01.acceptEitherAsync(future02, (res) ->
System.out.println("acceptEitherAsync方法开始执行 " + res);
, executor);
执行结果
applyToEitherAsync方法
CompletableFuture<String> result = future01.applyToEitherAsync(future02, (res) ->
System.out.println("applyToEitherAsync方法开始执行 " + res);
return "res: " + res.toString();
, executor);
System.out.println(result.get());
执行结果
多任务组合
将多个任务进行组合。
allOf:等待所有任务完成。
anyOf:只要有一个任务完成。
下面代码编写了3个任务
CompletableFuture<Integer> future01 = CompletableFuture.supplyAsync(() ->
System.out.println("任务一开始:" + Thread.currentThread().getId());
try
Thread.sleep(500);// 线程睡眠500ms
catch (InterruptedException e)
e.printStackTrace();
System.out.println("任务一结束");
return 10;
, executor);
CompletableFuture<Integer> future02 = CompletableFuture.supplyAsync(() ->
System.out.println("任务二开始:" + Thread.currentThread().getId());
System.out.println("任务二结束");
return 20;
, executor);
CompletableFuture<Integer> future03 = CompletableFuture.supplyAsync(() ->
System.out.println("任务三开始:" + Thread.currentThread().getId());
System.out.println("任务三结束");
return 30;
, executor);
allOf & anyOf
CompletableFuture<Void> allOf = future01.allOf(future02, future03);
CompletableFuture<Object> anyOf = future01.anyOf(future02, future03);
以上是关于JUC并发编程之CompletableFuture基础用法的主要内容,如果未能解决你的问题,请参考以下文章
JUC - 多线程之ForkJoin;异步调用CompletableFuture