CompletableFuture 异步编排
Posted 码出新生活!
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了CompletableFuture 异步编排相关的知识,希望对你有一定的参考价值。
CompletableFuture 异步编排
业务场景
查询商品详情的业务比较复杂,有的数据还需要远程调用
// 获取sku的基本信息 0.5s
// 获取sku的图片信息 0.5s
// 获取sku的促销信息 1s
// 获取所有spu的销售属性 1s
// 获取规格参数组以及组下规格参数 1.5s
// spu详情 1s
假如获取商品详情页的每个查询,都需要如下标注时间来完成,服务器返回数据每次都需要5.5s,显然是不能接受的
如果多线程同时完成这6步操作,也许只需要1.5秒即可响应完成
CompletableFuture介绍
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {}
我们看到继承至Future,可以获取到异步执行结果。
1、创建异步对象
CompletableFuture提供了四个静态方法来创建对象
// 异步执行,无需返回
public static CompletableFuture<Void> runAsync(Runnable runnable)
// 指定线程池,异步执行,无需返回
public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor)
// 异步执行,有返回值
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
// 指定线程池,异步执行,有返回值
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor)
代码示例
package com.test.controller.future;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @author lishanbiao
* @Date 2021/11/23
*/
public class CompletableFutureTest {
static ExecutorService executorService = Executors.newFixedThreadPool(10);
/**
* 如何创建异步对象
* // 异步执行,无需返回
* public static CompletableFuture<Void> runAsync(Runnable runnable)
* // 指定线程池,异步执行,无需返回
* public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor)
* // 异步执行,有返回值
* public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
* // 指定线程池,异步执行,有返回值
* public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor)
*/
public static void main(String[] args) throws Exception {
runAsync(); // 异步执行,无需返回值,主线程无需等待结果返回
runAsyncWithExecutor();
CompletableFuture<Integer> future = supplyAsync(); // 异步执行,有返回值,主线程需等待结果返回
System.out.println("supplyAsync返回结果:" + future.get());
CompletableFuture<Integer> execFuture = supplyAsyncWithExecutor();
System.out.println("supplyAsync返回结果:" + execFuture.get());
}
/**
* 异步执行,无需返回
*/
static void runAsync() {
System.out.println("main……start……");
CompletableFuture.runAsync(() -> {
System.out.println("当前线程:" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("运行结果:" + i);
});
System.out.println("main……end……");
}
/**
* 异步执行,无需返回,用线程池
*/
static void runAsyncWithExecutor() {
System.out.println("main……start……");
CompletableFuture.runAsync(() -> {
System.out.println("当前线程:" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("运行结果:" + i);
}, executorService);
System.out.println("main……end……");
}
/**
* 异步执行,有返回值
*/
static CompletableFuture<Integer> supplyAsync() throws Exception {
return CompletableFuture.supplyAsync(() -> {
System.out.println("当前线程:" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("运行结果:" + i);
return i;
});
}
/**
* 异步执行,有返回值,
*/
static CompletableFuture<Integer> supplyAsyncWithExecutor() throws Exception {
return CompletableFuture.supplyAsync(() -> {
System.out.println("当前线程:" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("运行结果:" + i);
return i;
});
}
}
2、完成时回调
CompletableFuture提供了四个,感知或处理结果和异常的方法
// 处理正常和异常结果,无返回值
public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action)
// 另开启一个线程,处理正常和异常结果,无返回值
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action)
// 另开启一个线程池中的线程,处理正常和异常结果,无返回值
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor)
// 处理异常情况,有返回值
public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn)
代码示例
package com.test.controller.future;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @author lishanbiao
* @Date 2021/11/23
*/
public class CompletableFutureTest {
static ExecutorService executorService = Executors.newFixedThreadPool(10);
/**
* 2、结果和异常处理
* // 处理正常和异常结果
* public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action)
* // 另开启一个线程,处理正常和异常结果
* public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action)
* // 另开启一个线程池中的线程,处理正常和异常结果
* public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor)
* // 处理异常情况
* public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn)
*/
public static void main(String[] args) throws Exception {
CompletableFuture<Integer> whenCompleteFuture = whenComplete();
System.out.println("whenCompleted返回结果:" + whenCompleteFuture.get());
}
/**
* 异步执行,有返回值,
*/
static CompletableFuture<Integer> supplyAsyncWithExecutor() throws Exception {
return CompletableFuture.supplyAsync(() -> {
System.out.println("当前线程:" + Thread.currentThread().getId());
// 故障制造异常,则返回结果为 10
// int i = 10 / 0;
// 正常,则返回结果为 5
int i = 10 / 2;
System.out.println("运行结果:" + i);
return i;
});
}
/**
* 异步执行,有返回值,
*/
static CompletableFuture<Integer> whenComplete() throws Exception {
return supplyAsyncWithExecutor()
// 感知异常
.whenComplete((resultData, exception) -> {
System.out.println("执行supplyAsync后,调用whenComplete返回的数据:" + resultData + ",异常:" + exception);
// 处理异常情况
}).exceptionally(throwable -> 10);
}
}
3、完成时处理
CompletableFuture提供了handle方法是另一种处理结果的方式
// 处理上一次结果,有返回值
public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn)
// 新开线程处理上一次结果,有返回值
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn)
// 新拿取线程池中线程处理上一次结果,有返回值
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor)
代码示例
package com.atguigu.gulimail.test.controller.future;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @author lishanbiao
* @Date 2021/11/23
*/
public class CompletableFutureTest {
static ExecutorService executorService = Executors.newFixedThreadPool(10);
/**
* 3、完成时处理
* // 处理上一次结果,有返回值
* public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn)
* // 新开线程处理上一次结果,有返回值
* public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn)
* // 新拿取线程池中线程处理上一次结果,有返回值
* public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor)
*/
public static void main(String[] args) throws Exception {
CompletableFuture<Integer> handleFuture = handle();
System.out.println("handleFuture返回结果:" + handleFuture.get());
}
/**
* 异步执行,有返回值,
*/
static CompletableFuture<Integer> supplyAsyncWithExecutor() throws Exception {
return CompletableFuture.supplyAsync(() -> {
System.out.println("当前线程:" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("运行结果:" + i);
return i;
});
}
/**
* 异步执行,可处理返回值
*/
static CompletableFuture<Integer> handle() throws Exception {
return supplyAsyncWithExecutor().handle((resultData, exception) -> {
System.out.println("执行supplyAsync后,调用whenComplete返回的数据:" + resultData + ",异常:" + exception);
if (resultData == null) {
return resultData * 2;
}
if (exception != null) {
return 0;
}
return resultData;
});
}
}
4、线程串行化方法
CompletableFuture提供了一系列的串行化方法
// 不能获取到上一步执行结果,但需要等待上一个任务执行完成,无返回值
public CompletableFuture<Void> thenRun(Runnable action)
// 不能获取到上一步执行结果,但需要等待上一个任务执行完成,无返回值,新开线程
public CompletableFuture<Void> thenRunAsync(Runnable action)
// 不能获取到上一步执行结果,但需要等待上一个任务执行完成,无返回值,线程池中新开线程
public CompletableFuture<Void> thenRunAsync(Runnable action,Executor executor)
// 消费一个线程结果,不返回信息
public CompletableFuture<Void> thenAccept(Consumer<? super T> action)
// 消费一个线程结果,不返回信息,新开线程
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action)
// 消费一个线程结果,不返回信息,线程池中新开线程
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor)
// 消费一个线程结果,返回信息
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
// 消费一个线程结果,返回信息,新开线程
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
// 消费一个线程结果,返回信息,线程池中新开线程
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
代码示例
package com.atguigu.gulimail.test.controller.future;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @author lishanbiao
* @Date 2021/11/23
*/
public class CompletableFutureTest {
static ExecutorService executorService = Executors.newFixedThreadPool(10);
/**
* 4、线程串行化方法
*/
public static void main(String[] args) throws Exception {
CompletableFuture<Integer> handleFuture = handle();
// 这些方法请自己尝试测验
thenRun();
thenRunAsync();
thenRunAsyncWithExec();
thenAccept();
thenAcceptAsync();
thenAcceptAsyncWithExec();
CompletableFuture<String> applyFuture = thenApplyAsync();
applyFuture = thenApply();
applyFuture = thenApplyAsyncWithExec();
Thread.sleep(50000);
}
System.out.println("运行结果:" + i);
}, executorService);
System.out.println("main……end……");
}
/**
* 异步执行,有返回值
*/
static CompletableFuture<Integer> supplyAsync() throws Exception {
return CompletableFuture.supplyAsync(() -> {
System.out.println("当前线程:" + Thread.currentThread().getId());
int i = 10 / 2;
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("运行结果:" + i);
return i;
});
}
/**
* 不能获取到上一步执行结果,但需要等待上一个任务执行完成,无返回值(等待线程处理完之后,新开另一个线程执行其他任务)
*/
static void thenRun() throws Exception {
supplyAsync().thenRun(() -> {
System.out.println("我是上一个异步操作执行完后的处理……");
});
}
/**
* 不能获取到上一步执行结果,但需要等待上一个任务执行完成,无返回值,新开线程(等待线程处理完之后,新开另一个线程执行其他任务)
*/
static void thenRunAsync() throws Exception {
supplyAsync().thenRunAsync(() -> {
System.out.println("我是上一个异步操作执行完后的处理……");
});
}
/**
* 不能获取到上一步执行结果,但需要等待上一个任务执行完成,无返回值,线程池中新开线程(等待线程处理完之后,新开另一个线程执行其他任务)
*/
static void thenRunAsyncWithExec() throws Exception {
supplyAsync().thenRunAsync(() -> {
System.out.println("我是上一个异步操作执行完后的处理……");
}, executorService);
}
/**
* 消费一个线程结果,不返回信息
*/
static void thenAccept() throws Exception {
supplyAsync().thenAccept(res -> {
System.out.println("上一个线程返回的结果:" + res);
});
}
/**
* 消费一个线程结果,不返回信息,线程池中新开线程
*/
static void thenAcceptAsync() throws Exception {
supplyAsync().thenAcceptAsync(res -> {
System.out.println("上一个线程返回的结果:" + res);
});
}
/**
* 消费一个线程结果,不返回信息,新开线程
*/
static void thenAcceptAsyncWithExec() throws Exception {
supplyAsync().thenAcceptAsync(res -> {
System.out.println("上一个线程返回的结果:" + res);
}, executorService);
}
/**
* 消费一个线程结果,返回信息
* @return
*/
static CompletableFuture<String> thenApply() throws Exception {
return supplyAsync().thenApply(res -> {
System.out.println("上一个线程返回的结果:" + res);
return "我是一个apply";
});
}
/**
* 消费一个线程结果,返回信息,新开线程
*/
static CompletableFuture<String> thenApplyAsync() throws Exception {
return supplyAsync().thenApplyAsync(res -> {
System.out.println("上一个线程返回的结果:" + res);
return "我是一个apply";
});
}
/**
* 消费一个线程结果,返回信息,线程池中新开线程
* @return
*/
static CompletableFuture<String> thenApplyAsyncWithExec() throws Exception {
return supplyAsync().thenApplyAsync(res -> {
System.out.println("上一个线程返回的结果:" + res);
return "我是一个apply";
}, executorService);
}
}
5、两个任务组合(both)
CompletableFuture提供both组合模式--两个任务必须都完成,触发改任务
// 调用者任务与参数任务执行完成后,触发action任务
public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other,Runnable action)
// 调用者任务与参数任务执行完成后,新开一个线程触发action任务
public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action)
// 调用者任务与参数任务执行完成后,线程池新开一个线程触发action任务
public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action,Executor executor)
// 消费两个父任务执行结果,触发action任务,无返回值
public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action)
// 消费两个父任务执行结果,新开一个线程触发action任务,无返回值
public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action)
// 消费两个父任务执行结果,线程池新开一个线程触发action任务,无返回值
public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action, Executor executor)
// 处理两个父任务结果,触发子任务并返回结果
public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn)
// 处理两个父任务结果,新开一个线程触发子任务并返回结果
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn)
// 处理两个父任务结果,线程池新开一个线程触发子任务并返回结果
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn, Executor executor)
5.1 ps
值得注意的是,两个任务必然是并行执行的!
关于CompletionStage<?>究竟是什么呢?
我们会发现,CompletableFuture 实现了CompletionStage,也就是说,我们需要在方法里面再传一个任务,与调用者一起组成两个任务,都完成后,执行后续操作
代码示例
public static void main(String[] args) throws Exception {
runAfterBoth();
runAfterBothAsync();
runAfterBothAsyncWithExec();
thenAcceptBoth();
thenAcceptBothAsync();
thenAcceptBothAsyncWithExec();
CompletableFuture<String> thenCombineFuture = thenCombine();
thenCombineFuture = thenCombineAsync();
thenCombineFuture = thenCombineAsyncWithExec();
System.out.println(thenCombineFuture.get());
Thread.sleep(50000);
}
/**
* 异步执行,有返回值
*/
static CompletableFuture<Integer> supplyAsync() throws Exception {
return CompletableFuture.supplyAsync(() -> {
System.out.println("当前线程:" + Thread.currentThread().getId());
int i = 10 / 2;
try {
System.out.println("在此等待中……");
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("运行结果:" + i);
return i;
});
}
/**
* 两个父任务执行完毕,触发action任务
*
* @return
*/
static void runAfterBoth() throws Exception {
supplyAsync().runAfterBoth(supplyAsync(), () -> {
System.out.println("runAfterBoth任务执行完成");
});
}
/**
* 两个父任务执行完毕,新开一个线程触发action任务
*
* @return
*/
static void runAfterBothAsync() throws Exception {
supplyAsync().runAfterBothAsync(supplyAsync(), () -> {
System.out.println("runAfterBoth任务执行完成");
});
}
/**
* 两个父任务执行完毕,线程池新开一个线程触发action任务
*
* @return
*/
static void runAfterBothAsyncWithExec() throws Exception {
supplyAsync().runAfterBothAsync(supplyAsync(), () -> {
System.out.println("runAfterBoth任务执行完成");
}, executorService);
}
/**
* 消费两个父任务执行结果,触发action任务,无返回值
*
* @return
*/
static void thenAcceptBoth() throws Exception {
supplyAsync().thenAcceptBoth(supplyAsync(), (firstResult, secondResult) -> {
System.out.println("thenAcceptBoth-任务执行完成");
});
}
/**
* 消费两个父任务执行结果,新开一个线程触发action任务,无返回值
*
* @return
*/
static void thenAcceptBothAsync() throws Exception {
supplyAsync().thenAcceptBothAsync(supplyAsync(), (firstResult, secondResult) -> {
System.out.println("thenAcceptBoth-任务执行完成");
});
}
/**
* 消费两个父任务执行结果,线程池新开一个线程触发action任务,无返回值
*
* @return
*/
static void thenAcceptBothAsyncWithExec() throws Exception {
supplyAsync().thenAcceptBothAsync(supplyAsync(), (firstResult, secondResult) -> {
System.out.println("thenAcceptBoth-任务执行完成");
}, executorService);
}
/**
* 处理两个父任务结果,触发子任务并返回结果
*
* @return
*/
static CompletableFuture<String> thenCombine() throws Exception {
return supplyAsync().thenCombine(supplyAsync(), (firstResult, secondResult) -> {
System.out.println("thenAcceptBoth-任务执行完成");
return "thenCombine";
});
}
/**
* 处理两个父任务结果,新开一个线程触发子任务并返回结果
*
* @return
*/
static CompletableFuture<String> thenCombineAsync() throws Exception {
return supplyAsync().thenCombineAsync(supplyAsync(), (firstResult, secondResult) -> {
System.out.println("thenAcceptBoth-任务执行完成");
return "thenCombineAsync";
});
}
/**
* 处理两个父任务结果,线程池新开一个线程触发子任务并返回结果
*
* @return
*/
static CompletableFuture<String> thenCombineAsyncWithExec() throws Exception {
return supplyAsync().thenCombineAsync(supplyAsync(), (firstResult, secondResult) -> {
System.out.println("thenAcceptBoth-任务执行完成");
return "thenCombineAsyncWithExec";
}, executorService);
}
6、两个任务组合(either)
CompletableFuture提供either组合模式--两个任务只要完成一个,触发改任务
与both有异曲同工之妙,照葫芦画瓢,这里不过多阐述
// 两个父任务结果只要返回一个,触发子任务,无返回结果
public CompletableFuture<Void> runAfterEither(CompletionStage<?> other,Runnable action)
// 两个父任务结果只要返回一个,新开一个线程触发子任务,无返回结果
public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action)
// 两个父任务结果只要返回一个,线程池新开一个线程触发子任务,无返回结果
public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action,Executor executor)
// 两个父任务结果只要返回一个,消费其结果,触发子任务,无返回结果
public CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action)
// 两个父任务结果只要返回一个,消费其结果,新开一个线程触发子任务,无返回结果
public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action)
// 两个父任务结果只要返回一个,消费其结果,线程池新开一个线程触发子任务,无返回结果
public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action,Executor executor)
// 两个父任务结果只要返回一个,触发子任务处理其结果,并返回
public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn)
// 两个父任务结果只要返回一个,新开线程触发子任务处理其结果,并返回
public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn)
// 两个父任务结果只要返回一个,线程池新开线程触发子任务处理其结果,并返回
public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn,Executor executor)
7、多任务组合
CompletableFuture提供了多任务组合模式(allOff、anyOff)
// 执行完任何一个任务后,返回其结果,有返回值
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
// 等待所有任务执行完毕,返回空值
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
代码示例
public static void main(String[] args) throws Exception {
allOf();
CompletableFuture<Object> objectCompletableFuture = anyOf();
System.out.println(objectCompletableFuture.get());
// 主线程等待运行
Thread.sleep(10000);
}
/**
* 异步执行,有返回值
*/
static CompletableFuture<Integer> supplyAsync() throws Exception {
return CompletableFuture.supplyAsync(() -> {
System.out.println("当前线程:" + Thread.currentThread().getId());
int i = 10 / 2;
try {
System.out.println("在此等待中……");
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("运行结果:" + i);
return i;
});
}
/**
* 执行完任何一个任务后,返回其结果,有返回值
*
* @return
*/
static CompletableFuture<Object> anyOf() throws Exception {
return CompletableFuture.anyOf(supplyAsync(), supplyAsync(), supplyAsync());
}
/**
* 等待所有任务执行完毕,返回空值
*
* @return
*/
static CompletableFuture<Void> allOf() throws Exception {
return CompletableFuture.allOf(supplyAsync(), supplyAsync(), supplyAsync());
}
7.1 ps
值得注意的是,无论是anyOf还是allOf,最后所有的线程任务都会执行完毕!
总结篇:
- run相关的方法,通常用来做下一步操作
- accept相关的方法,通常用来消费结果,无返回值
- supply、apply、combine相关的方法是有返回值的
- handle方法用于处理正常和异常结果
相信大家应该都学以致用了吧!
以上是关于CompletableFuture 异步编排的主要内容,如果未能解决你的问题,请参考以下文章
CompletableFuture异步编排(线程串行化代码示例)
CompletableFuture异步编排(线程串行化代码示例)