CompletableFuture以异步执行多个数据库查询
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了CompletableFuture以异步执行多个数据库查询相关的知识,希望对你有一定的参考价值。
我想并行执行多个数据库查询,并将结果存储在映射中。我正在尝试这样做,但是访问地图时,地图并未完全填充。
我做错什么了吗?
public Map<MapKeyEnums, Set<String>> doDBCalls(String phoneNumber, long timestamp) {
Map<MapKeyEnums, Set<String>> instrumentsEdgesMap = new EnumMap<>(MapKeyEnums.class);
CompletableFuture.supplyAsync(() -> dbReadService.getCall(phoneNumber, PhoneNumber.class, "ABC", timestamp)).
thenApply(x -> instrumentsEdgesMap.put(MapKeyEnums.ABC, x));
CompletableFuture.supplyAsync(() -> dbReadService.getCall(phoneNumber, PhoneNumber.class, "XYZ", timestamp)).
thenApply(x -> instrumentsEdgesMap.put(MapKeyEnums.XYZ, x));
CompletableFuture.supplyAsync(() -> dbReadService.getCall(phoneNumber, PhoneNumber.class, "DEF", timestamp)).
thenApply(x -> instrumentsEdgesMap.put(MapKeyEnums.DEF, x));
return instrumentsEdgesMap;
}
感谢您的任何帮助,在此先感谢。
答案
返回结果之前,您必须等待期货完成。
尝试类似的东西
public Map<MapKeyEnums, Set<String>> doDBCalls(String phoneNumber, long timestamp) {
Map<MapKeyEnums, Set<String>> instrumentsEdgesMap = new EnumMap<>(MapKeyEnums.class);
CompletableFuture.allOf(
CompletableFuture.supplyAsync(() -> dbReadService.getCall(phoneNumber, PhoneNumber.class, "ABC", timestamp))
.thenAccept(x -> instrumentsEdgesMap.put(MapKeyEnums.ABC, x)),
CompletableFuture.supplyAsync(() -> dbReadService.getCall(phoneNumber, PhoneNumber.class, "XYZ", timestamp))
.thenAccept(x -> instrumentsEdgesMap.put(MapKeyEnums.XYZ, x)),
CompletableFuture.supplyAsync(() -> dbReadService.getCall(phoneNumber, PhoneNumber.class, "DEF", timestamp))
.thenAccept(x -> instrumentsEdgesMap.put(MapKeyEnums.DEF, x)))
.get();
return instrumentsEdgesMap;
}
另一答案
在上述方法中,supplyAsync
将由Async
中的ForkJoinPool线程执行,但thenApply
方法总是通过调用完全不异步的线程来执行
所有没有显式Executor参数的异步方法都是使用ForkJoinPool.commonPool()执行的(除非它不支持并行度至少为2,在这种情况下,将创建一个新的Thread来运行每个任务)。
这里是示例
CompletableFuture.supplyAsync(()->{
System.out.println(Thread.currentThread().getName());
return "SupplyAsync";
}).thenAccept(i->{
System.out.println(Thread.currentThread().getName()+"--"+i);
});
输出:
ForkJoinPool.commonPool-worker-3
main--SupplyAsync
因此,如果您希望您的进程为Async
,则首先使用supplyAsync
触发所有三个数据库查询,并在CompletableFuture
中捕获输出
CompletableFuture<Set<String>> first = CompletableFuture.supplyAsync(() -> dbReadService.getCall(phoneNumber, PhoneNumber.class, "ABC", timestamp));
CompletableFuture<Set<String>> second = CompletableFuture.supplyAsync(() -> dbReadService.getCall(phoneNumber, PhoneNumber.class, "XYZ", timestamp));
CompletableFuture<Set<String>> third = CompletableFuture.supplyAsync(() -> dbReadService.getCall(phoneNumber, PhoneNumber.class, "DEF", timestamp));
然后现在使用其中三个创建流,然后将它们收集到Map
Stream.of(new AbstractMap.SimpleEntry<MapKeyEnums, CompletableFuture<Set<String>>>(MapKeyEnums.ABC, first),
new AbstractMap.SimpleEntry<MapKeyEnums, CompletableFuture<Set<String>>>(MapKeyEnums.XYZ, second),
new AbstractMap.SimpleEntry<MapKeyEnums, CompletableFuture<Set<String>>>(MapKeyEnums.DEF, third))
.forEach(entry->{
entry.getValue().thenAccept(val-> instrumentsEdgesMap.put(entry.getKey(), val));
});
以上是关于CompletableFuture以异步执行多个数据库查询的主要内容,如果未能解决你的问题,请参考以下文章
JUC - 多线程之ForkJoin;异步调用CompletableFuture