JUC-Java多线程Future,CompletableFuture
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了JUC-Java多线程Future,CompletableFuture相关的知识,希望对你有一定的参考价值。
多线程相关概念
1把锁:synchronized
2个并:并发(concurrent)在同一实体上的多个事件,在一台处理器上“同时处理多个任务”,同一时刻,其实是只有一个时间在发生
并行(parallel)在不同实体上的多个时间,在多台处理器上同时处理多个任务,同一时刻,大家都在做事情,你做你的,我做到我的,但是我们都在做
3个程:进程:在系统中运行的一个应用程序就是一个进程,每一个进程都有自己的内存空间和系统资源
线程:也被成为轻量级进程,在一个进程中会有1个或多个进程,是大多数操作系统进行时序调度的基本单元
管程:Monitor(监视器),也就是平时所说的锁
Monitor其实就是一种同步机制,他的义务是保证(同一时间)只有一个线程可以访问被保护的数据和代码。
JVM中同步是基于进入和退出监视器对象(Monitor,管程对象)来实现的,每个对象实例都会有一个Monitor对象
Object o = new Object();
Thread t1 = new Thread(() ->
synchronized (o)
System.out.println(1);
, "t1");
t1.start();
用户线程和守护线程
一般情况下不做特别说明配置,默认都是用户线程
用户线程(User Thread): 是系统的工作线程,它会完成这个程序需要完成的业务条件。
守护线程(Daemon Thread):是一种特殊的线程为其它线程服务的,在后台默默地完成一些系统性的服务
守护线程作为一个服务线程,没有服务对象就没有必要继续运行了 ,如果用户线程全部结束了,意味着程序需要完成的业务操作已经结束了,系统可以退出了,所以假如当系统只剩下守护线程的时候,java虚拟机会自动退出。
// 是否是守护线程
t1.isDaemon();
// 设置为守护线程
t1.setDaemon(true);
setDaemon(true)方法必须在start()之前设置
Future接口
Future接口(Future实现类)定义了操作异步任务执行一些方法,如获取异步任务的执行结果、取消任务的执行、判断任务是否被取消、判断任务执行是否完毕等。
比如主线程让一个子线程去执行任务,子线程可能比较耗时,启动子线程开始执行任务后,主线程就去做其他事情了,忙其他事情或者执行完,过了一会才去获取子任务的执行结果或变更的任务状态。
总结: Future接口可以为主线程开一个分支任务,专门为主线程处理耗时和费力的复杂业务。
FutureTask异步任务
public class CompletableFutureDemo
public static void main(String[] args) throws ExecutionException, InterruptedException
FutureTask futureTask = new FutureTask(new MyThread());
Thread t1 = new Thread(futureTask);
t1.start();
System.out.println(futureTask.get());
class MyThread implements Callable<String>
@Override
public String call() throws Exception
System.out.println("come in call----");
return "hello callable";
Future+线程池异步多线程任务配合,能显著提高程序的执行效率。
futureTask.get();
futureTask.isDone();
Future对于结果的获取不是很友好,只能通过阻塞或轮询的方式得到任务的结果。
CompletableFuture
从jdk1.8开始引入,它是Future的功能增强版,减少阻塞和轮询。可以传入回调对象,当异步任务完成或者发生异常时,自动调用回调对象的回调方法。
ExecutorService threadPool = Executors.newFixedThreadPool(3);
// CompletableFuture.runAsync() 无返回值
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() ->
System.out.println(Thread.currentThread().getName());
System.out.println("come in ---");
return "hello";
,threadPool);
String s = completableFuture.get();
System.out.println(s);
threadPool.shutdown();
//这里用自定义线程池,采用默认线程会当作一个守护线程,main方法执行完后future线程还未处理完时会直接关闭
ExecutorService threadPool = Executors.newFixedThreadPool(3);
try
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() ->
System.out.println("come in ----");
int i = ThreadLocalRandom.current().nextInt(10);
try
TimeUnit.SECONDS.sleep(1);
catch (Exception e)
e.printStackTrace();
System.out.println("---1秒后出结果:" + i);
return i;
, threadPool).whenComplete((v, e) ->
if (e == null)
System.out.println("计算完成:" + v);
).exceptionally(e ->
System.out.println(e.getCause());
return null;
);
System.out.println(java.lang.Thread.currentThread().getName() + "去忙其他任务了");
catch (Exception e)
e.printStackTrace();
finally
threadPool.shutdown();
CompletableFuture join和get区别
在编译时是否报出检查型异常
CompletableFuture的优点
异步任务结束时,会自动回调某个对象的方法
主线程设置好回调后,不再关心异步任务的执行,异步任务之间可以顺序执行
异步任务出错时,会自动回调某个对象的方法
ps:和javascript回调不能说相似,只能说一模一样,一通百通
Lambda表达式+Stream流式调用+CHain链式调用+Java8函数式编程
Runnable
无参数,无返回值
Function
Function<T, R> 接收一个参数,并且有返回值
Consumer
Consumer接收一个参数,并且没有返回值
BiConsumer
BiConsumer<T, U>接收两个参数(Bi,英文单词词根,代表两个的意思),没有返回值
Supplier
Supplier供给型函数接口,没有参数,有一个返回值
Predicate
一般用于做判断,返回boolean类型
总结
函数时接口名称 | 方法名称 | 参数 | 返回值 |
Runnable | run | 无参数 | 无返回值 |
Function | apply | 1个参数 | 有返回值 |
Consumer | accept | 1个参数 | 无返回值 |
Supplier | get | 没有参数 | 有返回值 |
BiConsumer | accept | 2个参数 | 无返回值 |
Predicate | test | 1个参数 | 有返回值(boolean) |
日常工作中如何进行开发?
功能→性能,先完成功能实现,再考虑性能优化
CompletableFuture用例
假设要从多个电商平台查询一件商品的价格,每次查询耗时设定为1秒
普通查询:查询电商平台1→查询电商平台2→查询电商平台3 …
CompletableFuture: 同时异步执行要查询的电商平台
public class CompletableFutureDemo
static List<NetMall> list = Arrays.asList(
new NetMall("jd"),
new NetMall("dangdang"),
new NetMall("taobao"),
new NetMall("pdd"),
new NetMall("tmall")
);
/**
* step by step 一家家搜查
* List<NetMall> ----->map------> List<String>
* @param list
* @param productName
* @return
*/
public static List<String> getPrice(List<NetMall> list,String productName)
//《mysql》 in taobao price is 90.43
return list
.stream()
.map(netMall ->
String.format(productName + " in %s price is %.2f",
netMall.getNetMallName(),
netMall.calcPrice(productName)))
.collect(Collectors.toList());
/**
* List<NetMall> ----->List<CompletableFuture<String>>------> List<String>
* @param list
* @param productName
* @return
*/
public static List<String> getPriceByCompletableFuture(List<NetMall> list,String productName)
return list.stream().map(netMall ->
CompletableFuture.supplyAsync(() -> String.format(productName + " in %s price is %.2f",
netMall.getNetMallName(),
netMall.calcPrice(productName))))
.collect(Collectors.toList())
.stream()
.map(s -> s.join())
.collect(Collectors.toList());
public static void main(String[] args)
long startTime = System.currentTimeMillis();
List<String> list1 = getPrice(list, "mysql");
for (String element : list1)
System.out.println(element);
long endTime = System.currentTimeMillis();
System.out.println("----costTime: "+(endTime - startTime) +" 毫秒");
System.out.println("--------------------");
long startTime2 = System.currentTimeMillis();
List<String> list2 = getPriceByCompletableFuture(list, "mysql");
for (String element : list2)
System.out.println(element);
long endTime2 = System.currentTimeMillis();
System.out.println("----costTime: "+(endTime2 - startTime2) +" 毫秒");
class NetMall
@Getter
private String netMallName;
public NetMall(String netMallName)
this.netMallName = netMallName;
public double calcPrice(String productName)
try TimeUnit.SECONDS.sleep(1); catch (InterruptedException e) e.printStackTrace();
return ThreadLocalRandom.current().nextDouble() * 2 + productName.charAt(0);
执行效率显著提高
CompletableFuture常用API
获取结果和触发计算
T get(); 容易造成阻塞,非得拿到结果,否则不往下执行
T get(long timeout, TimeUnit unit); 指定时间后还没拿到结果,直接TimtoutException
T join(); 和get一样,区别在于编译时是否报出检查型异常
T getNow(T valueIfAbsent); 没有计算完成的情况下,返回一个替代结果。立即获取结果不阻塞:计算完,返回计算完成后的结果,没计算完:返回设定的valueIfAbsend值
boolean complete(T value); 是否打断get方法立即返回括号值,计算完:不打断,返回计算后的结果,没计算完:打断返回设定的value值
allOf:多个CompletableFuture任务并发执行,所有CompletableFuture任务完成时,返回一个新的CompletableFuture对象,其返回值为Void,也就是无返回值。该方法的应用之一是在继续程序之前等待一组独立的 CompletableFuture 完成,如:CompletableFuture.allOf(c1, c2, c3).join();
anyOf:多个CompletableFuture任务并发执行,只要有一个CompletableFuture任务完成时,就会返回一个新的CompletableFuture对象,并返回该CompletableFuture执行完成任务的返回值。
对计算结果进行处理
thenApply 计算结果存在依赖关系,将两个线程串行化,由于存在依赖关系(当前步错,不走下一步),当前步骤有异常的话就叫停。
handle
execptionally类似 try/catch
whenCpmplete和handle类似 try/finally,有异常也会往下执行
对计算结果进行消费
thenAccept 顾名思义,消费型接口。接收任务的处理结果,并消费处理,无返回结果。
对比
API | 说明 | |
thenRun | thenRun(Runnable runnable) | 任务A执行完执行B,并且B不需要A的结果 |
thenAccept | thenAccpet(Consumer action) | 任务A执行完执行B,B需要A的结果,但是任务B无返回值 |
thenApply | thenApply(Function fn) | 任务A执行完执行B,B需要A的结果,同时任务B有返回值 |
System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenRun(() -> ).join());
System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenAccept(r -> System.out.println(r)).join());
System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenApply(r -> r + "resultB").join());
thenRun和thenRunAsync区别?
- 没有传入自定义线程池,都用默认线程池ForkJoinPool
- 如果执行第一个任务的时候,传入一个自定义线程池
- 调用thenRun方法执行第二个任务时,则第二个任务和第一个任务时使用同一个线程池
- 调用thenRunAsync执行第二个任务时,则第一个任务使用的是你自己传入的线程池,第二个任务使用的是ForkJoinpool线程池
- 备注:有可能处理的太快,系统优化切换原则,直接使用main线程处理
其他如:thenAccept和thenAccpetAsync,thenApply和thenApplyAsync等,它们之间的区别也是同理
对计算速度选用
applyToEither对比任务执行速度
CompletableFuture<String> playA = CompletableFuture.supplyAsync(() ->
System.out.println("A come in");
try TimeUnit.SECONDS.sleep(3); catch (InterruptedException e) e.printStackTrace();
return "playA";
);
CompletableFuture<String> playB = CompletableFuture.supplyAsync(() ->
System.out.println("B come in");
try TimeUnit.SECONDS.sleep(1); catch (InterruptedException e) e.printStackTrace();
return "playB";
);
CompletableFuture<String> result = playA.applyToEither(playB, f ->
return f + " is winer";
);
System.out.println(Thread.currentThread().getName()+"\\t"+"-----: "+result.join());
对计算结果进行合并
两个CompletionStage任务都完成后,最终能把两个任务的结果一起交给thenCombine来处理。先完成的先等着,等待其他分支任务。
public static void main(String[] args)
long start = System.currentTimeMillis();
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(()->
try
TimeUnit.SECONDS.sleep(5);
catch (InterruptedException e)
throw new RuntimeException(e);
return 10;
);
CompletableFuture<Integer> future2= CompletableFuture.supplyAsync(()->
try
TimeUnit.SECONDS.sleep(3);
catch (InterruptedException e)
throw new RuntimeException(e);
return 10;
);
CompletableFuture<Integer> future = future1.thenCombine(future2, (x, y) ->
System.out.println("-----开始两个结果合并");
return x + y;
);
System.out.println(future.join());
long end = System.currentTimeMillis();
System.out.println("耗时:" + (end-start));
以上是关于JUC-Java多线程Future,CompletableFuture的主要内容,如果未能解决你的问题,请参考以下文章
Java多线程系列--“JUC线程池”06之 Callable和Future