012-FutureFutureTaskCompletionService CompletableFuture
Posted bjlhx
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了012-FutureFutureTaskCompletionService CompletableFuture相关的知识,希望对你有一定的参考价值。
一、概述
创建线程的两种方式,一种是直接继承Thread,另外一种就是实现Runnable接口。这两种方式都有一个缺陷就是:在执行完任务之后无法获取执行结果。如果需要获取执行结果,就必须通过共享变量或者使用线程通信的方式来达到效果,这样使用起来就比较麻烦。而自从Java 1.5开始,就提供了Callable和Future,通过它们可以在任务执行完毕之后得到任务执行结果。
详述:https://www.cnblogs.com/bjlhx/p/7588971.html
1.1、Runnable接口
它是一个接口,里面只声明了一个run()方法:
public interface Runnable { public abstract void run(); }
由于run()方法返回值为void类型,所以在执行完任务之后无法返回任何结果。
1.2、Callable接口
Callable接口位于java.util.concurrent包下,在它里面也只声明了一个方法,只不过这个方法叫做call()。
public interface Callable<V> { V call() throws Exception; }
是一个泛型接口,call()函数返回的类型就是传递进来的V类型。Callable接口可以看作是Runnable接口的补充,call方法带有返回值,并且可以抛出异常。
1.3、FutureTask类
如何获取Callable的返回结果:一般是通过FutureTask这个中间媒介来实现的。整体的流程是这样的:
把Callable实例当作参数,生成一个FutureTask的对象,然后把这个对象当作一个Runnable,作为参数另起线程。
1.3.1、FutureTask结构
1.3.2、FutureTask使用
方式一、使用thread方式
FutureTask实现了Runnable,因此它既可以通过Thread包装来直接执行,也可以提交给ExecuteService来执行。以下使用Thread包装线程方式启动
public static void main(String[] args) throws Exception { Callable<Integer> call = () -> { System.out.println("计算线程正在计算结果..."); Thread.sleep(3000); return 1; }; FutureTask<Integer> task = new FutureTask<>(call); Thread thread = new Thread(task); thread.start(); System.out.println("main线程干点别的..."); Integer result = task.get(); System.out.println("从计算线程拿到的结果为:" + result); }
方式二、使用 ExecutorService
ExecutorService executor = Executors.newFixedThreadPool(2);线程池方式
public static void main(String[] args) { Callable<String> callable1=()->{ Thread.sleep(2000); return Thread.currentThread().getName(); }; Callable<String> callable2=()->{ Thread.sleep(3000); return Thread.currentThread().getName(); }; FutureTask<String> futureTask1 = new FutureTask<>(callable1);// 将Callable写的任务封装到一个由执行者调度的FutureTask对象 FutureTask<String> futureTask2 = new FutureTask<>(callable2); ExecutorService executor = Executors.newFixedThreadPool(2); // 创建线程池并返回ExecutorService实例 executor.execute(futureTask1); // 执行任务 executor.execute(futureTask2); //同时开启了两个任务 long startTime = System.currentTimeMillis(); while (true) { try { if(futureTask1.isDone() && futureTask2.isDone()){// 两个任务都完成 System.out.println("Done"); executor.shutdown(); // 关闭线程池和服务 return; } if(!futureTask1.isDone()){ // 任务1没有完成,会等待,直到任务完成 System.out.println("FutureTask1 output="+futureTask1.get()); } System.out.println("Waiting for FutureTask2 to complete"); String s = futureTask2.get(200L, TimeUnit.MILLISECONDS); if(s !=null){ System.out.println("FutureTask2 output="+s); } } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); }catch(TimeoutException e){ //do nothing } System.out.println((System.currentTimeMillis()-startTime)); } }
1.4、Future接口
FutureTask继承体系中的核心接口是Future。Future的核心思想是:
一个方法,计算过程可能非常耗时,等待方法返回,显然不明智。可以在调用方法的时候,立马返回一个Future,可以通过Future这个数据结构去控制方法f的计算过程。
这里的控制包括:
get方法:获取计算结果(如果还没计算完,也是必须等待的)
get(long timeout, TimeUnit unit)用来获取执行结果,如果在指定时间内,还没获取到结果,就直接返回null。
cancel方法:还没计算完,可以取消计算过程
isDone方法:判断是否计算完
isCancelled方法:判断计算是否被取消
简单的说这就是Future
1.5、CompletionService
原理:内部通过阻塞队列+FutureTask,实现了任务先完成可优先获取到,即结果按照完成先后顺序排序。
package com.lhx.cloud.futruetask; import java.time.LocalDateTime; import java.util.ArrayList; import java.util.List; import java.util.concurrent.*; public class CompletionServiceDemo { public static void main(String[] args) { Long start = System.currentTimeMillis(); //开启5个线程 ExecutorService exs = Executors.newFixedThreadPool(5); try { int taskCount = 10; //结果集 List<Integer> list = new ArrayList<>(); //1.定义CompletionService CompletionService<Integer> completionService = new ExecutorCompletionService<>(exs); List<Future<Integer>> futureList = new ArrayList<>(); //2.添加任务 for(int i=0;i<taskCount;i++){ futureList.add(completionService.submit(new Task(i+1))); } //==================结果归集=================== //方法1:future是提交时返回的,遍历queue则按照任务提交顺序,获取结果 // for (Future<Integer> future : futureList) { // System.out.println("===================="); // Integer result = future.get();//线程在这里阻塞等待该任务执行完毕,按照 // System.out.println("任务result="+result+"获取到结果!"+new Date()); // list.add(result); // } // //方法2.使用内部阻塞队列的take() for(int i=0;i<taskCount;i++){ Integer result = completionService.take().get();//采用completionService.take(),内部维护阻塞队列,任务先完成的先获取到 System.out.println(LocalDateTime.now()+"---任务i=="+result+"完成!"); list.add(result); } System.out.println("list="+list); System.out.println("总耗时="+(System.currentTimeMillis()-start)); } catch (Exception e) { e.printStackTrace(); } finally { exs.shutdown();//关闭线程池 } } static class Task implements Callable<Integer>{ Integer i; public Task(Integer i) { super(); this.i=i; } @Override public Integer call() throws Exception { if(i==5){ Thread.sleep(5000); }else{ Thread.sleep(1000); } System.out.println("线程:"+Thread.currentThread().getName()+"任务i="+i+",执行完成!"); return i; } } }
建议:使用率也挺高,而且能按照完成先后排序,建议如果有排序需求的优先使用。只是多线程并发执行任务结果归集,也可以使用。
二、CompletableFuture
2.1、对标Futrue
Future 接口,用于描述一个异步计算的结果。虽然 Future 以及相关使用方法提供了异步执行任务的能力,但是对于结果的获取却是很不方便,只能通过阻塞或者轮询的方式得到任务的结果。
阻塞的方式显然和我们的异步编程的初衷相违背,轮询的方式又会耗费无谓的 CPU 资源,而且也不能及时地得到计算结果,为什么不能用观察者设计模式呢?即当计算结果完成及时通知监听者。
Future局限性,它很难直接表述多个Future 结果之间的依赖性。
2.2、类图
2.2.1、CompletionStage
-
CompletionStage代表异步计算过程中的某一个阶段,一个阶段完成以后可能会触发另外一个阶段
-
一个阶段的计算执行可以是一个Function,Consumer或者Runnable。比如:stage.thenApply(x -> square(x)).thenAccept(x -> System.out.print(x)).thenRun(() -> System.out.println())
-
一个阶段的执行可能是被单个阶段的完成触发,也可能是由多个阶段一起触发
2.2.2、Future
2.3、创建CompletableFuture对象
CompletableFuture.compleatedFuture是一个静态辅助方法,用来返回一个已经计算好的CompletableFuture.
以下四个静态方法用来为一段异步执行的代码创建CompletableFuture对象:
public static CompletableFuture<Void> runAsync(Runnable runnable) public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
以Async结尾并且没有指定Executor的方法会使用ForkJoinPool.commonPool() 作为它的线程池执行异步代码。
runAsync方法:它以Runnabel函数式接口类型为参数,所以CompletableFuture的计算结果为空。
supplyAsync方法以Supplier<U>函数式接口类型为参数,CompletableFuture的计算结果类型为U。
注意:这些线程都是Daemon线程,主线程结束Daemon线程不结束,只有JVM关闭时,生命周期终止。
示例:简单同步用法
public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { //长时间的计算任务 try { System.out.println("计算型任务开始"); Thread.sleep(2000); return "计算型任务结束"; } catch (InterruptedException e) { e.printStackTrace(); } return "·00"; }); System.out.println(future.get()); }
2.4、计算结果完成时的处理
当CompletableFuture的计算结果完成,或者抛出异常的时候,可以执行特定的Action。主要是下面的方法:
public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action) public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action) public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor) public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)
可以看到Action的类型是BiConsumer<? super T,? super Throwable>它可以处理正常的计算结果,或者异常情况。
方法不以Async结尾,意味着Action使用相同的线程执行,而Async可能会使用其他线程执行(如果是使用相同的线程池,也可能会被同一个线程选中执行)
示例:
package com.lhx.cloud.futruetask; import java.util.Random; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; public class BasicFuture { private static Random rand = new Random(); private static long t = System.currentTimeMillis(); static int getMoreData() { System.out.println("begin to start compute"); try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("end to compute,passed " + (System.currentTimeMillis()-t)); return rand.nextInt(1000); } public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<Integer> future = CompletableFuture.supplyAsync(BasicFuture::getMoreData); Future<Integer> f = future.whenComplete((v, e) -> { System.out.println(v); System.out.println(e); }); System.out.println(f.get()); }}
2.5、转换
CompletableFuture可以作为monad(单子)和functor. 由于回调风格的实现,我们不必因为等待一个计算完成而阻塞着调用线程,而是告诉CompletableFuture当计算完成的时候请执行某个Function. 还可以串联起来。
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn) public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
2.6、异常处理completeExceptionally
为了能获取任务线程内发生的异常,需要使用 CompletableFuture的completeExceptionally
方法将导致CompletableFuture内发生问题的异常抛出。
这样,当执行任务发生异常时,调用get()
方法的线程将会收到一个 ExecutionException
异常,该异常接收了一个包含失败原因的Exception 参数。
/** * 任务没有异常 正常执行,然后结束 */ @Test public void test1() throws ExecutionException, InterruptedException { CompletableFuture<String> completableFuture = new CompletableFuture<>(); new Thread(() -> { // 模拟执行耗时任务 System.out.println("task doing..."); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } // 告诉completableFuture任务已经完成 completableFuture.complete("ok"); }).start(); // 获取任务结果,如果没有完成会一直阻塞等待 String result = completableFuture.get(); System.out.println("计算结果:" + result); } /** * 线程有异常 正常执行,然后无法结束,主线程会一直等待 */ @Test public void test2() throws ExecutionException, InterruptedException { CompletableFuture<String> completableFuture = new CompletableFuture<>(); new Thread(() -> { // 模拟执行耗时任务 System.out.println("task doing..."); try { Thread.sleep(3000); int i=1/0; } catch (InterruptedException e) { e.printStackTrace(); } // 告诉completableFuture任务已经完成 completableFuture.complete("ok"); }).start(); // 获取任务结果,如果没有完成会一直阻塞等待 String result = completableFuture.get(); System.out.println("计算结果:" + result); }
/** * 线程有异常 正常执行,然后通过completableFuture.completeExceptionally(e);告诉completableFuture任务发生异常了 * 主线程接收到 程序继续处理,至结束 */ @Test public void test3() throws ExecutionException, InterruptedException { CompletableFuture<String> completableFuture = new CompletableFuture<>(); new Thread(() -> { // 模拟执行耗时任务 System.out.println("task doing..."); try { Thread.sleep(3000); int i = 1/0; } catch (Exception e) { // 告诉completableFuture任务发生异常了 completableFuture.completeExceptionally(e); } // 告诉completableFuture任务已经完成 completableFuture.complete("ok"); }).start(); // 获取任务结果,如果没有完成会一直阻塞等待 String result = completableFuture.get(); System.out.println("计算结果:" + result); }
2.7、多任务组合方法allOf和anyOf
allOf
是等待所有任务完成,构造后CompletableFuture完成
anyOf
是只要有一个任务完成,构造后CompletableFuture就完成
package com.lhx.cloud.futruetask; import java.time.LocalDateTime; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class CompletableFutureDemo { public static void main(String[] args) { Long start = System.currentTimeMillis(); // 结果集 List<String> list = new ArrayList<>(); ExecutorService executorService = Executors.newFixedThreadPool(10); List<Integer> taskList = Arrays.asList(2, 1, 3, 4, 5, 6, 7, 8, 9, 10); // 全流式处理转换成CompletableFuture[]+组装成一个无返回值CompletableFuture,join等待执行完毕。返回结果whenComplete获取 CompletableFuture[] cfs = taskList.stream() .map(integer -> CompletableFuture.supplyAsync(() -> calc(integer), executorService) .thenApply(h -> Integer.toString(h)) .whenComplete((s, e) -> { System.out.println(LocalDateTime.now()+"---任务" + s + "完成!result=" + s + ",异常 e=" + e); list.add(s); }) ).toArray(CompletableFuture[]::new); // 封装后无返回值,必须自己whenComplete()获取 CompletableFuture.allOf(cfs).join(); System.out.println("list=" + list + ",耗时=" + (System.currentTimeMillis() - start)); } public static Integer calc(Integer i) { try { if (i == 1) { Thread.sleep(3000);//任务1耗时3秒 } else if (i == 5) { Thread.sleep(5000);//任务5耗时5秒 } else { Thread.sleep(1000);//其它任务耗时1秒 } System.out.println(LocalDateTime.now()+"---task线程:" + Thread.currentThread().getName() + "任务i=" + i + ",完成!" ); } catch (InterruptedException e) { e.printStackTrace(); } return i; } }
2.8、常用多线程并发,取结果归集的几种实现方案
描述 | Future | FutureTask | CompletionService | CompletableFuture |
---|---|---|---|---|
原理 | Future接口 | 接口RunnableFuture的唯一实现类,RunnableFuture接口继承自Future+Runnable | 内部通过阻塞队列+FutureTask接口 | JDK8实现了Future, CompletionStage两个接口 |
多任务并发执行 | 支持 | 支持 | 支持 | 支持 |
获取任务结果的顺序 | 按照提交顺序获取结果 | 未知 | 支持任务完成的先后顺序 | 支持任务完成的先后顺序 |
异常捕捉 | 自己捕捉 | 自己捕捉 | 自己捕捉 | 原生API支持,返回每个任务的异常 |
建议 | CPU高速轮询,耗资源,或者阻塞,可以使用,但不推荐 | 功能不对口,并发任务这一块多套一层,不推荐使用 | 推荐使用,没有JDK8CompletableFuture之前最好的方案 | API极端丰富,配合流式编程,推荐使用! |
上表来源:https://www.cnblogs.com/dennyzhangdd/p/7010972.html
的
以上是关于012-FutureFutureTaskCompletionService CompletableFuture的主要内容,如果未能解决你的问题,请参考以下文章