CompletableFuture入门
Posted 我也有梦想呀
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了CompletableFuture入门相关的知识,希望对你有一定的参考价值。
CompletableFuture入门
1、Future vs CompletableFuture
1.1 准备工作
先定义一个工具类
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.StringJoiner;
import java.util.concurrent.TimeUnit;
public class CommonUtils
public static String readFile(String pathToFile)
try
return Files.readString(Paths.get(pathToFile));
catch(Exception e)
e.printStackTrace();
return "";
public static void sleepMillis(long millis)
try
TimeUnit.MILLISECONDS.sleep(millis);
catch (InterruptedException e)
e.printStackTrace();
public static void sleepSecond(int seconds)
try
TimeUnit.SECONDS.sleep(seconds);
catch (InterruptedException e)
e.printStackTrace();
public static void printThreadLog(String message)
String result = new StringJoiner(" | ")
.add(String.valueOf(System.currentTimeMillis()))
.add(String.format("%2d",Thread.currentThread().getId()))
.add(String.valueOf(Thread.currentThread().getName()))
.add(message)
.toString();
System.out.println(result);
1.2 Future 的局限性
需求:替换新闻稿 ( news.txt ) 中敏感词汇 ,把敏感词汇替换成*,敏感词存储在 filter_words.txt 中
public class FutureDemo
public static void main(String[] args) throws ExecutionException, InterruptedException
ExecutorService executor = Executors.newFixedThreadPool(5);
// step 1: 读取敏感词汇
Future<String[]> filterWordFuture = executor.submit(() ->
String str = CommonUtils.readFile("filter_words.txt");
String[] filterWords = str.split(",");
return filterWords;
);
// step 2: 读取新闻稿文件内容
Future<String> newsFuture = executor.submit(() ->
return CommonUtils.readFile("news.txt");
);
// step 3: 替换操作(当敏感词汇很多,文件很多,替换也会是个耗时的任务)
Future<String> replaceFuture = executor.submit(() ->
String[] words = filterWordFuture.get();
String news = newsFuture.get();
// 替换敏感词汇
for (String word : words)
if (news.indexOf(word) >= 0)
news = news.replace(word, "**");
return news;
);
String filteredNews = replaceFuture.get();
System.out.println("过滤后的新闻稿:" + filteredNews);
executor.shutdown();
通过上面的代码,我们会发现,Future相比于所有任务都直接在主线程处理,有很多优势,但同时也存在不足,至少表现如下:
- 在没有阻塞的情况下,无法对Future的结果执行进一步的操作。Future不会告知你它什么时候完成,你如果想要得到结果,必须通过一个get()方法,该方法会阻塞直到结果可用为止。 它不具备将回调函数附加到Future后并在Future的结果可用时自动调用回调的能力。
- 无法解决任务相互依赖的问题。filterWordFuture和newsFuture的结果不能自动发送给replaceFuture,需要在replaceFuture中手动获取,所以使用Future不能轻而易举地创建异步工作流。
- 不能将多个Future合并在一起。假设你有多种不同的Future,你想在它们全部并行完成后然后再运行某个函数,Future很难独立完成这一需要。
- 没有异常处理。Future提供的方法中没有专门的API应对异常处理,还是需要开发者自己手动异常处理。
1.3 CompletableFuture 的优势
CompletableFuture 实现了Future和CompletionStage接口
CompletableFuture 相对于 Future 具有以下优势:
- 为快速创建、链接依赖和组合多个Future提供了大量的便利方法。
- 提供了适用于各种开发场景的回调函数,它还提供了非常全面的异常处理支持。
- 无缝衔接和亲和 lambda 表达式 和 Stream - API 。
- 我见过的真正意义上的异步编程,把异步编程和函数式编程、响应式编程多种高阶编程思维集于一身,设计上更优雅。
2、创建异步任务
2.1 runAsync
如果你要异步运行某些耗时的后台任务,并且不想从任务中返回任何内容,则可以使用CompletableFuture.runAsync()
方法。它接受一个Runnable接口的实现类对象,方法返回CompletableFuture<Void>
对象
static CompletableFuture<Void> runAsync(Runnable runnable);
演示案例:开启一个不从任务中返回任何内容的CompletableFuture异步任务
public class RunAsyncDemo
public static void main(String[] args)
// runAsync 创建异步任务
CommonUtils.printThreadLog("main start");
// 使用Runnable匿名内部类
CompletableFuture.runAsync(new Runnable()
@Override
public void run()
CommonUtils.printThreadLog("读取文件开始");
// 使用睡眠来模拟一个长时间的工作任务(例如读取文件,网络请求等)
CommonUtils.sleepSecond(3);
CommonUtils.printThreadLog("读取文件结束");
);
CommonUtils.printThreadLog("here are not blocked,main continue");
CommonUtils.sleepSecond(4); // 此处休眠为的是等待CompletableFuture背后的线程池执行完成。
CommonUtils.printThreadLog("main end");
我们也可以以Lambda表达式的形式传递Runnable接口实现类对象
public class RunAsyncDemo2
public static void main(String[] args)
// runAsync 创建异步任务
CommonUtils.printThreadLog("main start");
// 使用Lambda表达式
CompletableFuture.runAsync(() ->
CommonUtils.printThreadLog("读取文件开始");
CommonUtils.sleepSecond(3);
CommonUtils.printThreadLog("读取文件结束");
);
CommonUtils.printThreadLog("here are not blocked,main continue");
CommonUtils.sleepSecond(4);
CommonUtils.printThreadLog("main end");
需求:使用CompletableFuture开启异步任务读取 news.txt 文件中的新闻稿,并打印输出。
public class RunAsyncDemo3
public static void main(String[] args)
// 需求:使用多线程异步读取 words.txt 中的敏感词汇,并打印输出。
CommonUtils.printThreadLog("main start");
CompletableFuture.runAsync(()->
String news = CommonUtils.readFile("news.txt");
CommonUtils.printThreadLog(news);
);
CommonUtils.printThreadLog("here are not blocked,main continue");
CommonUtils.sleepSecond(4);
CommonUtils.printThreadLog("main end");
在后续的章节中,我们会经常使用Lambda表达式。
2.2 supplyAsync
CompletableFuture.runAsync()
开启不带返回结果异步任务。但是,如果您想从后台的异步任务中返回一个结果怎么办?此时,CompletableFuture.supplyAsync()
是你最好的选择了。
static CompletableFuture<U> supplyAsync(Supplier<U> supplier)
它入参一个 Supplier<U> 供给者,用于供给带返回值的异步任务
并返回CompletableFuture<U>,其中U是供给者给程序供给值的类型。
需求:开启异步任务读取 news.txt 文件中的新闻稿,返回文件中内容并在主线程打印输出
public class SupplyAsyncDemo
public static void main(String[] args) throws ExecutionException, InterruptedException
CommonUtils.printThreadLog("main start");
CompletableFuture<String> newsFuture = CompletableFuture.supplyAsync(new Supplier<String>()
@Override
public String get()
String news = CommonUtils.readFile("news.txt");
return news;
);
CommonUtils.printThreadLog("here are not blocked,main continue");
// 阻塞并等待newsFuture完成
String news = newsFuture.get();
CommonUtils.printThreadLog("news = " + news);
CommonUtils.printThreadLog("main end");
如果想要获取newsFuture结果,可以调用completableFuture.get()方法,get()方法将阻塞,直到newsFuture完成。
我们依然可以使用Java 8的Lambda表达式使上面的代码更简洁。
CompletableFuture<String> newsFuture = CompletableFuture.supplyAsync(() ->
String news = CommonUtils.readFile("news.txt");
return news;
);
2.3 异步任务中的线程池
大家已经知道,runAsync()
和supplyAsync()
方法都是开启单独的线程中执行异步任务。但是,我们从未创建线程对吗? 不是吗!
CompletableFuture 会从全局的ForkJoinPool.commonPool()
线程池获取线程来执行这些任务
当然,你也可以创建一个线程池,并将其传递给runAsync()
和supplyAsync()
方法,以使它们在从您指定的线程池获得的线程中执行任务。
CompletableFuture API中的所有方法都有两种变体,一种是接受传入的Executor
参数作为指定的线程池,而另一种则使用默认的线程池 (ForkJoinPool.commonPool()
) 。
// runAsync() 的重载方法
static CompletableFuture<Void> runAsync(Runnable runnable)
static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
// supplyAsync() 的重载方法
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
需求:指定线程池,开启异步任务读取 news.txt 中的新闻稿,返回文件中内容并在主线程打印输出
ExecutorService executor = Executors.newFixedThreadPool(4);
CompletableFuture<String> newsFuture = CompletableFuture.supplyAsync(() ->
CommonUtils.printThreadLog("异步读取文件开始");
String news = CommonUtils.readFile("news.txt");
CommonUtils.printThreadLog("异步读取文件完成");
return news;
,executor);
最佳实践:创建属于自己的业务线程池
如果所有
CompletableFuture
共享一个线程池,那么一旦有异步任务执行一些很慢的 I/O 操作,就会导致线程池中所有线程都阻塞在 I/O 操作上,从而造成线程饥饿,进而影响整个系统的性能。所以,强烈建议你要根据不同的业务类型创建不同的线程池,以避免互相干扰。
2.4 异步编程思想
综合上述,看到了吧,我们没有显式地创建线程,更没有涉及线程通信的概念,整个过程根本就没涉及线程知识吧,以上专业的说法是:线程的创建和线程负责的任务进行解耦,它给我们带来的好处线程的创建和启动全部交给线程池负责,具体任务的编写就交给程序员,专人专事。
异步编程是可以让程序并行( 也可能是并发 )运行的一种手段,其可以让程序中的一个工作单元作为异步任务与主线程分开独立运行,并且在异步任务运行结束后,会通知主线程它的运行结果或者失败原因,毫无疑问,一个异步任务其实就是开启一个线程来完成的,使用异步编程可以提高应用程序的性能和响应能力等。
作为开发者,只需要有一个意识:
开发者只需要把耗时的操作交给CompletableFuture开启一个异步任务,然后继续关注主线程业务,当异步任务运行完成时会通知主线程它的运行结果。我们把具备了这种编程思想的开发称为异步编程思想。
3、异步任务回调
CompletableFuture.get()
方法是阻塞的。调用时它会阻塞等待 直到这个Future完成,并在完成后返回结果。 但是,很多时候这不是我们想要的。
对于构建异步系统,我们应该能够将回调附加到CompletableFuture上,当这个Future完成时,该回调应自动被调用。 这样,我们就不必等待结果了,然后在Future的回调函数内编写完成Future之后需要执行的逻辑。 您可以使用thenApply()
,thenAccept()
和thenRun()
方法,它们可以把回调函数附加到CompletableFuture
3.1 thenApply
使用 thenApply()
方法可以处理和转换CompletableFuture的结果。 它以Function<T,R>作为参数。 Function<T,R>是一个函数式接口,表示一个转换操作,它接受类型T的参数并产生类型R的结果
CompletableFuture<R> thenApply(Function<T,R> fn)
需求:异步读取 filter_words.txt 文件中的内容,读取完成后,把内容转换成数组( 敏感词数组 ),异步任务返回敏感词数组
public class ThenApplyDemo
public static void main(String[] args) throws ExecutionException, InterruptedException
CommonUtils.printThreadLog("main start");
CompletableFuture<String> readFileFuture = CompletableFuture.supplyAsync(() ->
CommonUtils.printThreadLog("读取filter_words文件");
String filterWordsContent = CommonUtils.readFile("filter_words.txt");
return filterWordsContent;
);
CompletableFuture<String[]> filterWordsFuture = readFileFuture.thenApply((content) ->
CommonUtils.printThreadLog("文件内容转换成敏感词数组");
String[] filterWords = content.split(",");
return filterWords;
);
CommonUtils.printThreadLog("main continue");
String[] filterWords = filterWordsFuture.get();
CommonUtils.printThreadLog("filterWords = " + Arrays.toString(filterWords));
CommonUtils.printThreadLog("main end");
你还可以通过附加一系列thenApply()
回调方法,在CompletableFuture上编写一系列转换序列。一个thenApply()
方法的结果可以传递给序列中的下一个,如果你对链式操作很了解,你会发现结果可以在链式操作上传递。
CompletableFuture<String[]> filterWordsFuture = CompletableFuture.supplyAsync(() ->
CommonUtils.printThreadLog("读取filter_words文件");
String filterWordsContent = CommonUtils.readFile("filter_words.txt");
return filterWordsContent;
).thenApply((content) ->
CommonUtils.printThreadLog("转换成敏感词数组");
String[] filterWords = content.split(",");
return filterWords;
);
3.2 thenAccept
如果你不想从回调函数返回结果,而只想在Future完成后运行一些代码,则可以使用thenAccept()
这些方法是入参一个 Consumer
CompletableFuture<Void> thenAccept(Consumer<T> action)
通常用作回调链中的最后一个回调。
需求:异步读取 filter_words.txt 文件中的内容,读取完成后,转换成敏感词数组,然后打印敏感词数组
public class ThenAcceptDemo
public static void main(String[] args)
CommonUtils.printThreadLog("main start");
CompletableFuture.supplyAsync(() ->
CommonUtils.printThreadLog("读取filter_words文件");
String filterWordsContent = CommonUtils.readFile("filter_words.txt");
return filterWordsContent;
).thenApply((content) ->
CommonUtils.printThreadLog("转换成敏感词数组");
String[] filterWords = content.split(",");
return filterWords;
).thenAccept((filterWords) ->
CommonUtils.printThreadLog("filterWords = " + Arrays.toString(filterWords));
);
CommonUtils.printThreadLog("main continue");
CommonUtils.sleepSecond(4);
CommonUtils.printThreadLog("main end");
3.3 thenRun
前面我们已经知道,通过thenApply( Function<T,R> ) 对链式操作中的上一个异步任务的结果进行转换,返回一个新的结果;
通过thenAccept( Consumer
如果我们只是想从CompletableFuture的链式操作得到一个完成的通知,甚至都不使用上一步链式操作的结果,那么 CompletableFuture.thenRun() 会是你最佳的选择,它需要一个Runnable并返回CompletableFuture<Void>
。
CompletableFuture<Void> thenRun(Runnable action);
演示案例:我们仅仅想知道 filter_words.txt 的文件是否读取完成
public class ThenRunDemo
public static void main(String[] args)
CommonUtils.printThreadLog("main start");
CompletableFuture.supplyAsync(() ->
CommonUtils.printThreadLog("读取filter_words文件");
String filterWordsContent = CommonUtils.readFile("filter_words.txt");
return filterWordsContent;
).thenRun(() ->
CommonUtils.printThreadLog("读取filter_words文件读取完成");
);
CommonUtils.printThreadLog("main continue");
CommonUtils.sleepSecond(4);
CommonUtils.printThreadLog("main end");
3.4 更进一步提升并行化
CompletableFuture 提供的所有回调方法都有两个异步变体
CompletableFuture<U> thenApply(Function<T,U> fn)
// 回调方法的异步变体(异步回调)
CompletableFuture<U> thenApplyAsync(Function<T,U> fn)
CompletableFuture<U> thenApplyAsync(Function<T,U> fn, Executor executor)
注意:这些带了Async的异步回调 通过在单独的线程中执行回调任务 来帮助您进一步促进并行化计算。
回顾需求:异步读取 filter_words.txt 文件中的内容,读取完成后,转换成敏感词数组,主线程获取结果打印输出这个数组
public class ThenApplyAsyncDemo
public static void main(String[] args) throws ExecutionException, InterruptedException
CommonUtils.printThreadLog("main start");
CompletableFuture<String[]> filterWordFuture = CompletableFuture.supplyAsync(() ->
/*
CommonUtils.printThreadLog("读取filter_words文件");
String filterWordsContent = CommonUtils.readFile("filter_words.txt");
return filterWordsContent;
*/
// 此时,立即返回结果
return "尼玛, NB, tmd";
).thenApply((content) ->
/**
* 一般而言,thenApply任务的执行和supplyAsync()任务执行可以使用同一线程执行
* 如果supplyAsync()任务立即返回结果,则thenApply的任务在主线程中执行
*/
CommonUtils.printThreadLog("把内容转换成敏感词数组");
String[] filterWords = content.split(",");
return filterWords;
);
CommonUtils.printThreadLog("main continue");
String[] filterWords = filterWordFuture.get();
CommonUtils.printThreadLog("filterWords = " + Arrays.toString(filterWords));
CommonUtils.printThreadLog("main end");
要更好地控制执行回调任务的线程,可以使用异步回调。如果使用thenApplyAsync()
回调,那么它将在从ForkJoinPool.commonPool()
获得的另一个线程中执行
public class ThenApplyAsyncDemo2
public static void main(String[] args) throws ExecutionException, InterruptedException
CommonUtils.printThreadLog("main start");
CompletableFuture<String[]> filterWordFuture = CompletableFuture.supplyAsync(() ->
CommonUtils.printThreadLog("读取filter_words文件");
String filterWordsContent = CommonUtils.readFile("filter_words.txt");
return filterWordsContent;
).thenApplyAsync((content) ->
CommonUtils.printThreadLog("把内容转换成敏感词数组");
String[] filterWords = content.split(",");
return filterWords;
);
CommonUtils.printThreadLog("main continue");
String[] filterWords = filterWordFuture.get();
CommonUtils.printThreadLog("filterWords = " + Arrays.toString(filterWords));
CommonUtils.printThreadLog("main end");
以上程序一种可能的运行结果(需要多运行几次):
1672885914481 | 1 | main | main start
1672885914511 | 16 | ForkJoinPool.commonPool-worker-1 | 读取filter_words.txt文件
1672885914511 | 1 | main | main continue
1672885914521 | 17 | ForkJoinPool.commonPool-worker-2 | 把内容转换成敏感词数组
1672885914521 | 1 | main | filterWords = [尼玛, NB, tmd]
1672885914521 | 1 | main | main end
此外,如果将Executor传递给thenApplyAsync()
回调,则该回调的异步任务将在从Executor的线程池中获取的线程中执行;
ExecutorService executor = Executors.newFixedThreadPool(5);
CompletableFuture<String[]> filterWordFuture = CompletableFuture.supplyAsync(() ->
CommonUtils.printThreadLog("读取filter_words文件");
String filterWordsContent = CommonUtils.readFile("filter_words.txt");
return filterWordsContent;
).thenApplyAsync((content) ->
CommonUtils.printThreadLog("把内容转换成敏感词数组");
String[] filterWords = content.split(",");
return filterWords;
,executor);
executor.shutdown();
其他两个回调的变体版本如下:
// thenAccept和其异步回调
CompletableFuture<Void> thenAccept(Consumer<T> action)
CompletableFuture<Void> thenAcceptAsync(Consumer<T> action)
CompletableFuture<Void> thenAcceptAsync(Consumer<T> action, Executor executor)
// thenRun和其异步回调
CompletableFuture<Void> thenRun(Runnable action)
CompletableFuture<Void> thenRunAsync(Runnable action)
CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor)
4、异步任务编排
4.1 编排2个依赖关系的异步任务 thenCompose()
回顾需求:异步读取 filter_words.txt 文件中的内容,读取完成后,转换成敏感词数组让主线程待用。
关于读取和解析内容,假设使用以下的 readFileFuture(String) 和 splitFuture(String) 方法完成。
public static CompletableFuture<String> readFileFuture(String fileName)
return CompletableFuture.supplyAsync(() ->
String filterWordsContent = CommonUtils.readFile(fileName);
return filterWordsContent;
);
public static CompletableFuture<String[]> splitFuture(String context)
return CompletableFuture.supplyAsync(() ->
String[] filterWords = context.split(",");
return filterWords;
);
现在,让我们先了解如果使用thenApply()
结果会发生什么
CompletableFuture<CompletableFuture<String[]>> future = readFileFuture("filter_words.txt")
.thenApply((context) ->
return splitFuture(context);
);
回顾在之前的案例中,thenApply(Function<T,R>)
中Function回调会对上一步任务结果转换后得到一个简单值 ,但现在这种情况下,最终结果是嵌套的CompletableFuture,所以这是不符合预期的,那怎么办呢?
我们想要的是:把上一步异步任务的结果,转成一个CompletableFuture对象,这个CompletableFuture对象中包含本次异步任务处理后的结果。也就是说,我们想组合上一步异步任务的结果到下一个新的异步任务中, 结果由这个新的异步任务返回
此时,你需要使用thenCompose()
方法代替,我们可以把它理解为 异步任务的组合
CompletableFuture<R> thenCompose(Function<T,CompletableFuture<R>> func)
所以,thenCompose()
用来连接两个有依赖关系的异步任务,结果由第二个任务返回
CompletableFuture<String[]> future = readFileFuture("filter_words.txt")
.thenCompose((context) ->
return splitFuture(context);
);
因此,这里积累了一个经验:
如果我们想连接( 编排 ) 两个依赖关系的异步任务( CompletableFuture 对象 ) ,请使用 thenCompose() 方法
当然,thenCompose 也存在异步回调变体版本:
CompletableFuture<R> thenCompose(Function<T,CompletableFuture<R>> fn)
CompletableFuture<R> thenComposeAsync(Function<T,CompletableFuture<R>> fn)
CompletableFuture<R> thenComposeAsync(Function<T,CompletableFuture<R>> fn, Executor executor)
4.2 编排2个非依赖关系的异步任务 thenCombine()
我们已经知道,当其中一个Future依赖于另一个Future,使用thenCompose()
用于组合两个Future。如果两个Future之间没有依赖关系,你希望两个Future独立运行并在两者都完成之后执行回调操作时,则使用thenCombine()
;
// T是第一个任务的结果 U是第二个任务的结果 V是经BiFunction应用转换后的结果
CompletableFuture<V> thenCombine(CompletableFuture<U> other, BiFunction<T,U,V> func)
需求:替换新闻稿 ( news.txt ) 中敏感词汇 ,把敏感词汇替换成*,敏感词存储在 filter_words.txt 中
public class ThenCombineDemo
public static void main(String[] args) throws Exception
// 读取敏感词汇的文件并解析到数组中
CompletableFuture<String[]> future1 = CompletableFuture.supplyAsync(() ->
CommonUtils.printThreadLog("读取敏感词汇并解析");
String context = CommonUtils.readFile("filter_words.txt");
String[] words = context.split(",");
return words;
);
// 读取news文件内容
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() ->
CommonUtils.printThreadLog("读取news文件内容");
String context = CommonUtils.readFile("news.txt");
return context;
);
CompletableFuture<String> combinedFuture = future1.thenCombine(future2, (words, context) ->
// 替换操作
CommonUtils.printThreadLog("替换操作");
for (String word : words)
if(context.indexOf(word) > -1)
context = context.replace(word, "**");
return context;
);
String filteredContext = combinedFuture.get();
System.out.println("filteredContext = " + filteredContext);
注意:当两个Future
都完成时,才将两个异步任务的结果传递给thenCombine()
的回调函数做进一步处理。
和以往一样,thenCombine 也存在异步回调变体版本
CompletableFuture<V> thenCombine(CompletableFuture<U> other, BiFunction<T,U,V> func)
CompletableFuture<V> thenCombineAsync(CompletableFuture<U> other, BiFunction<T,U,V> func)
CompletableFuture<V> thenCombineAsync(CompletableFuture<U> other, BiFunction<T,U,V> func,Executor executor)
4.3 合并多个异步任务 allOf / anyOf
我们使用thenCompose()
和thenCombine()
将两个CompletableFuture组合和合并在一起。
如果要编排任意数量的CompletableFuture怎么办?可以使用以下方法来组合任意数量的CompletableFuture
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
CompletableFuture.allOf()
用于以下情形中:有多个需要独立并行运行的Future
,并在所有这些Future
都完成后执行一些操作。
需求:统计news1.txt、new2.txt、new3.txt 文件中包含CompletableFuture关键字的文件的个数
public class AllOfDemo
public static CompletableFuture<String> readFileFuture(String fileName)
return CompletableFuture.supplyAsync(() ->
String content = CommonUtils.readFile(fileName);
return content;
);
public static void main(String[] args)
// step 1: 创建List集合存储文件名
List<String> fileList = Arrays.asList("news1.txt", "news2.txt", "news3.txt");
// step 2: 根据文件名调用readFileFuture创建多个CompletableFuture,并存入List集合中
List<CompletableFuture<String>> readFileFutureList = fileList.stream().map(fileName ->
return readFileFuture(fileName);
).collect(Collectors.toList());
// step 3: 把List集合转换成数组待用,以便传入allOf方法中
int len = readFileFutureList.size();
CompletableFuture[] readFileFutureArr = readFileFutureList.toArray(new CompletableFuture[len]);
// step 4: 使用allOf方法合并多个异步任务
CompletableFuture<Void> allOfFuture = CompletableFuture.allOf(readFileFutureArr);
// step 5: 当多个异步任务都完成后,使用回调操作文件结果,统计符合条件的文件个数
CompletableFuture<Long> countFuture = allOfFuture.thenApply(v ->
return readFileFutureList.stream()
.map(future -> future.join())
.filter(content -> content.contains("CompletableFuture"))
.count();
);
// step 6: 主线程打印输出文件个数
Long count = countFuture.join();
System.out.println("count = " + count);
顾名思义,当给定的多个异步任务中的有任意Future一个完成时,需要执行一些操作,可以使用 anyOf 方法
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
anyOf()
返回一个新的CompletableFuture,新的CompletableFuture的结果和 cfs中已完成的那个异步任务结果相同。
演示案例:anyOf 执行过程
public class AnyOfDemo
public static void main(String[] args) throws ExecutionException, InterruptedException
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() ->
Tools.sleepMillis(2);
return "Future1的结果";
);
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() ->
Tools.sleepMillis(1);
return "Future2的结果";
);
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() ->
Tools.sleepMillis(3);
return "Future3的结果";
);
CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(future1, future2, future3);
// 输出Future2的结果
System.out.println(anyOfFuture.get());
在上面的示例中,当三个CompletableFuture中的任意一个完成时,anyOfFuture就完成了。 由于future2的睡眠时间最少,因此它将首先完成,最终结果将是"Future2的结果"。
注意:
anyOf()
方法返回类型必须是CompletableFuture <Object>
。anyOf()
的问题在于,如果您拥有返回不同类型结果的CompletableFuture,那么您将不知道最终CompletableFuture的类型。
5、异步任务的异常处理
在前面的章节中,我们并没有更多地关心异常处理的问题,其实,CompletableFuture 提供了优化处理异常的方式。
首先,让我们了解异常如何在回调链中传播。
public static void main(String[] args) throws ExecutionException, InterruptedException
CompletableFuture<Void> cf = CompletableFuture.supplyAsync(() ->
int r = 1 / 0;
return "result1";
).thenApply(result ->
return result + " result2";
).thenApply(result ->
return result + " result3";
).thenAccept((result)->
System.out.println(result);
);
如果在 supplyAsync 任务中出现异常,后续的 thenApply 和 thenAccept 回调都不会执行,CompletableFuture 将转入异常处理
如果在第一个 thenApply 任务中出现异常,第二个 thenApply 和 最后的 thenAccept 回调不会被执行,CompletableFuture 将转入异常处理,依次类推。
5.1 exceptionally()
exceptionally 用于处理回调链上的异常,回调链上出现的任何异常,回调链不继续向下执行,都在exceptionally中处理异常。
// Throwable表示具体的异常对象e
CompletableFuture<R> exceptionally(Function<Throwable, R> func)
public class ExceptionallyDemo
public static void main(String[] args) throws ExecutionException, InterruptedException
CompletableFuture<String> future = CompletableFuture.supplyAsync(() ->
int r = 1 / 0;
return "result1";
).thenApply(result ->
String str = null;
int len = str.length();
return result + " result2";
).thenApply(result ->
return result + " result3";
).exceptionally(ex ->
System.out.println("出现异常:" + ex.getMessage());
return "Unknown";
);
String ret = future.get();
Tools.printThreadLog("最终结果:" + ret);
因为exceptionally只处理一次异常,所以常常用在回调链的末端。
5.2 handle()
CompletableFuture API 还提供了一种更通用的方法 handle()
表示从异常中恢复
handle() 常常被用来恢复回调链中的一次特定的异常,回调链恢复后可以进一步向下传递。
CompletableFuture<R> handle(BiFunction<T, Throwable, R> fn)
public class HandleDemo
public static void main(String[] args) throws ExecutionException, InterruptedException
CompletableFuture<String> future = CompletableFuture.supplyAsync(() ->
int r = 1 / 0;
return "result";
).handle((ret, ex) ->
if(ex != null)
System.out.println("我们得到异常:" + ex.getMessage());
return "Unknown!";
return ret;
);
String ret = future.get();
CommonUtils.printThreadLog(ret);
如果发生异常,则res参数将为null,否则ex参数将为null。
需求:对回调链中的一次异常进行恢复处理
public class HandleExceptionDemo2
public static void main(String[] args) throws ExecutionException, InterruptedException
CompletableFuture<String> future = CompletableFuture.supplyAsync(() ->
int r = 1 / 0;
return "result1";
).handle((ret, ex) ->
if (ex != null)
System.out.println("我们得到异常:" + ex.getMessage());
return "Unknown1";
return ret;
).thenApply(result ->
String str = null;
int len = str.length();
return result + " result2";
).handle((ret, ex) ->
if (ex != null)
System.out.println("我们得到异常:" + ex.getMessage());
return "Unknown2";
return ret;
).thenApply(result ->
return result + " result3";
);
String ret = future.get();
Tools.printThreadLog("最终结果:" + ret);
和以往一样,为了提升并行化,异常处理可以方法单独的线程执行,以下是它们的异步回调版本
CompletableFuture<R> exceptionally(Function<Throwable, R> fn)
CompletableFuture<R> exceptionallyAsync(Function<Throwable, R> fn) // jdk17+
CompletableFuture<R> exceptionallyAsync(Function<Throwable, R> fn,Executor executor) // jdk17+
CompletableFuture<R> handle(BiFunction<T,Throwable,R> fn)
CompletableFuture<R> handleAsync(BiFunction<T,Throwable,R> fn)
CompletableFuture<R> handleAsync(BiFunction<T,Throwable,R> fn, Executor executor)
CompletableFuture:等待第一个正常返回?
【中文标题】CompletableFuture:等待第一个正常返回?【英文标题】:CompletableFuture: Waiting for first one normally return? 【发布时间】:2016-02-28 01:08:12 【问题描述】:我有一些CompletableFuture
s,我想并行运行它们,等待第一个返回正常。
我知道我可以使用CompletableFuture.anyOf
等待第一个返回,但这将返回正常或异常。我想忽略异常。
List<CompletableFuture<?>> futures = names.stream().map(
(String name) ->
CompletableFuture.supplyAsync(
() ->
// this calling may throw exceptions.
new Task(name).run()
)
).collect(Collectors.toList());
//FIXME Can not ignore exceptionally returned takes.
Future any = CompletableFuture.anyOf(futures.toArray(new CompletableFuture<?>[]));
try
logger.info(any.get().toString());
catch (Exception e)
e.printStackTrace();
【问题讨论】:
【参考方案1】:您可以使用以下辅助方法:
public static <T>
CompletableFuture<T> anyOf(List<? extends CompletionStage<? extends T>> l)
CompletableFuture<T> f=new CompletableFuture<>();
Consumer<T> complete=f::complete;
l.forEach(s -> s.thenAccept(complete));
return f;
您可以像这样使用它,以证明它将忽略早期的异常但返回第一个提供的值:
List<CompletableFuture<String>> futures = Arrays.asList(
CompletableFuture.supplyAsync(
() -> throw new RuntimeException("failing immediately");
),
CompletableFuture.supplyAsync(
() -> LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(5));
return "with 5s delay";
),
CompletableFuture.supplyAsync(
() -> LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(10));
return "with 10s delay";
)
);
CompletableFuture<String> c = anyOf(futures);
logger.info(c.join());
此解决方案的一个缺点是,如果 all 期货异常完成,它将永远完成。如果计算成功则提供第一个值但如果根本没有成功计算则异常失败的解决方案涉及更多:
public static <T>
CompletableFuture<T> anyOf(List<? extends CompletionStage<? extends T>> l)
CompletableFuture<T> f=new CompletableFuture<>();
Consumer<T> complete=f::complete;
CompletableFuture.allOf(
l.stream().map(s -> s.thenAccept(complete)).toArray(CompletableFuture<?>[]::new)
).exceptionally(ex -> f.completeExceptionally(ex); return null; );
return f;
它利用了 allOf
的异常处理程序仅在所有期货都完成后(无论是否例外)才被调用的事实,并且未来只能完成一次(抛开像 obtrude…
这样的特殊事物)。当异常处理程序被执行时,任何试图以结果完成未来的尝试都已经完成(如果有的话),因此只有在之前没有成功完成的情况下,尝试异常完成它才会成功。
它可以与第一个解决方案完全相同的方式使用,并且只有在所有计算失败时才会表现出不同的行为,例如:
List<CompletableFuture<String>> futures = Arrays.asList(
CompletableFuture.supplyAsync(
() -> throw new RuntimeException("failing immediately");
),
CompletableFuture.supplyAsync(
// delayed to demonstrate that the solution will wait for all completions
// to ensure it doesn't miss a possible successful computation
() -> LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(5));
throw new RuntimeException("failing later");
)
);
CompletableFuture<String> c = anyOf(futures);
try logger.info(c.join());
catch(CompletionException ex) logger.severe(ex.toString());
上面的示例使用延迟来演示解决方案将在没有成功时等待所有完成,而this example on ideone 将演示稍后的成功如何将结果变为成功。请注意,由于 Ideones 缓存结果,您可能不会注意到延迟。
请注意,如果所有期货都失败,则无法保证会报告哪些异常。由于它在错误情况下等待所有完成,因此任何人都可以到达最终结果。
【讨论】:
让我们continue this discussion in chat. @Basilevs:我已经扩展了答案 纯粹的幸福!谢谢!【参考方案2】:考虑到:
Java 哲学的基础之一是防止或阻止不良编程实践。
(它在多大程度上成功做到了这一点是另一个争论的主题;重点仍然是,这无疑是该语言的主要目标之一。)
忽略异常是一种非常糟糕的做法。
异常应该总是重新抛出到上面的层,或者处理,或者至少报告。具体来说, 异常应该永远不会被默默吞下。
应尽早报告错误。
例如,看看运行时为了提供 fail fast 迭代器所经历的痛苦,如果集合在迭代时被修改,则会抛出 ConcurrentModificationException。
忽略异常完成的 CompletableFuture
意味着 a) 您没有尽早报告错误,并且 b) 您很可能计划根本不报告错误。
不能简单地等待第一个非异常完成而不得不被异常完成所困扰,这不会带来任何重大负担,因为您总是可以从列表中删除异常完成的项目,(同时不要忘记报告失败,对吗?)然后重复等待。
因此,如果 Java 中有意缺少所寻求的功能,我不会感到惊讶,并且我愿意争辩说它正确地地丢失了。 p>
(抱歉,Sotirios,没有规范的答案。)
【讨论】:
考虑备用信息源(例如热插拔备份或负载平衡集群)。如果来源是可互换的,已知偶尔会失败,并且需要大量时间来响应,那么忽略一些错误是完全合法且可取的。 @Basilevs true,但最好还是记录它们并忽略日志消息。没有任何记录的任何类型的失败都不是一个好主意。【参考方案3】:嗯,这是框架应该支持的方法。首先,我认为CompletionStage.applyToEither 做了类似的事情,但事实证明它没有。所以我想出了这个解决方案:
public static <U> CompletionStage<U> firstCompleted(Collection<CompletionStage<U>> stages)
final int count = stages.size();
if (count <= 0)
throw new IllegalArgumentException("stages must not be empty");
final AtomicInteger settled = new AtomicInteger();
final CompletableFuture<U> future = new CompletableFuture<U>();
BiConsumer<U, Throwable> consumer = (val, exc) ->
if (exc == null)
future.complete(val);
else
if (settled.incrementAndGet() >= count)
// Complete with the last exception. You can aggregate all the exceptions if you wish.
future.completeExceptionally(exc);
;
for (CompletionStage<U> item : stages)
item.whenComplete(consumer);
return future;
要查看它的实际效果,这里有一些用法:
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
public class Main
public static <U> CompletionStage<U> firstCompleted(Collection<CompletionStage<U>> stages)
final int count = stages.size();
if (count <= 0)
throw new IllegalArgumentException("stages must not be empty");
final AtomicInteger settled = new AtomicInteger();
final CompletableFuture<U> future = new CompletableFuture<U>();
BiConsumer<U, Throwable> consumer = (val, exc) ->
if (exc == null)
future.complete(val);
else
if (settled.incrementAndGet() >= count)
// Complete with the last exception. You can aggregate all the exceptions if you wish.
future.completeExceptionally(exc);
;
for (CompletionStage<U> item : stages)
item.whenComplete(consumer);
return future;
private static final ScheduledExecutorService worker = Executors.newSingleThreadScheduledExecutor();
public static <U> CompletionStage<U> delayed(final U value, long delay)
CompletableFuture<U> future = new CompletableFuture<U>();
worker.schedule(() ->
future.complete(value);
, delay, TimeUnit.MILLISECONDS);
return future;
public static <U> CompletionStage<U> delayedExceptionally(final Throwable value, long delay)
CompletableFuture<U> future = new CompletableFuture<U>();
worker.schedule(() ->
future.completeExceptionally(value);
, delay, TimeUnit.MILLISECONDS);
return future;
public static void main(String[] args) throws InterruptedException, ExecutionException
System.out.println("Started...");
/*
// Looks like applyToEither doesn't work as expected
CompletableFuture<Integer> a = CompletableFuture.completedFuture(99);
CompletableFuture<Integer> b = Main.<Integer>completedExceptionally(new Exception("Exc")).toCompletableFuture();
System.out.println(b.applyToEither(a, x -> x).get()); // throws Exc
*/
try
List<CompletionStage<Integer>> futures = new ArrayList<>();
futures.add(Main.<Integer>delayedExceptionally(new Exception("Exception #1"), 100));
futures.add(Main.<Integer>delayedExceptionally(new Exception("Exception #2"), 200));
futures.add(delayed(1, 1000));
futures.add(Main.<Integer>delayedExceptionally(new Exception("Exception #4"), 400));
futures.add(delayed(2, 500));
futures.add(Main.<Integer>delayedExceptionally(new Exception("Exception #5"), 600));
Integer value = firstCompleted(futures).toCompletableFuture().get();
System.out.println("Completed normally: " + value);
catch (Exception ex)
System.out.println("Completed exceptionally");
ex.printStackTrace();
try
List<CompletionStage<Integer>> futures = new ArrayList<>();
futures.add(Main.<Integer>delayedExceptionally(new Exception("Exception B#1"), 400));
futures.add(Main.<Integer>delayedExceptionally(new Exception("Exception B#2"), 200));
Integer value = firstCompleted(futures).toCompletableFuture().get();
System.out.println("Completed normally: " + value);
catch (Exception ex)
System.out.println("Completed exceptionally");
ex.printStackTrace();
System.out.println("End...");
【讨论】:
【参考方案4】:对上面的代码做了一些修改,允许测试第一个结果是否符合预期。
public class MyTask implements Callable<String>
@Override
public String call() throws Exception
int randomNum = ThreadLocalRandom.current().nextInt(5, 20 + 1);
for (int i = 0; i < randomNum; i++)
TimeUnit.SECONDS.sleep(1);
return "MyTest" + randomNum;
public class CompletableFutureUtils
private static <T> T resolve(FutureTask<T> futureTask)
try
futureTask.run();
return futureTask.get();
catch (Exception e)
throw new RuntimeException(e);
private static <V> boolean predicate(Predicate<V> predicate, V v)
try
return predicate.test(v);
catch (Exception e)
return false;
public static <T> void cancel(List<FutureTask<T>> futureTasks)
if (futureTasks != null && futureTasks.isEmpty() == false)
futureTasks.stream().filter(f -> f.isDone() == false).forEach(f -> f.cancel(true));
public static <V> CompletableFuture<V> supplyAsync(List<FutureTask<V>> futureTasks, Predicate<V> predicate)
return supplyAsync(futureTasks, predicate, null);
public static <V> CompletableFuture<V> supplyAsync(List<FutureTask<V>> futureTasks, Predicate<V> predicate,
Executor executor)
final int count = futureTasks.size();
final AtomicInteger settled = new AtomicInteger();
final CompletableFuture<V> result = new CompletableFuture<V>();
final BiConsumer<V, Throwable> action = (value, ex) ->
settled.incrementAndGet();
if (result.isDone() == false)
if (ex == null)
if (predicate(predicate, value))
result.complete(value);
cancel(futureTasks);
else if (settled.get() >= count)
result.complete(null);
else if (settled.get() >= count)
result.completeExceptionally(ex);
;
for (FutureTask<V> futureTask : futureTasks)
if (executor != null)
CompletableFuture.supplyAsync(() -> resolve(futureTask), executor).whenCompleteAsync(action, executor);
else
CompletableFuture.supplyAsync(() -> resolve(futureTask)).whenCompleteAsync(action);
return result;
public class DemoApplication
public static void main(String[] args)
List<FutureTask<String>> tasks = new ArrayList<FutureTask<String>>();
for (int i = 0; i < 2; i++)
FutureTask<String> task = new FutureTask<String>(new MyTask());
tasks.add(task);
Predicate<String> test = (s) -> true;
CompletableFuture<String> result = CompletableFutureUtils.supplyAsync(tasks, test);
try
String s = result.get(20, TimeUnit.SECONDS);
System.out.println("result=" + s);
catch (Exception e)
e.printStackTrace();
CompletableFutureUtils.cancel(tasks);
调用CompletableFutureUtils.cancel(tasks);
非常重要,所以当超时发生时,它会取消后台任务。
【讨论】:
【参考方案5】:我发现 Vertx - CompositeFuture.any 方法在这种情况下非常有用。它专为完全相同的情况而设计。当然你必须用户vertx定义Future。 Vertx CompositeFuture API Docs
【讨论】:
以上是关于CompletableFuture入门的主要内容,如果未能解决你的问题,请参考以下文章
CompletableFuture CompletableFuture.supplyAsync 异常处理