提升--15---线程池基本概念CallableFutureCompletableFuture

Posted 高高for 循环

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了提升--15---线程池基本概念CallableFutureCompletableFuture相关的知识,希望对你有一定的参考价值。


线程池—基本概念

多线程–08–线程池

1. Executor–ExecutorService–ThreadPoolExecutor

Executor

  • Executor看它的名字也能理解,执行者,所以他有一个方法叫执行,void execute(Runnable command);那么执行的东西是Runnable,所以这个Executor有了之后呢由于它是一个接口,他可以有好多实现,因此我们说,有了Executor之后呢,我们现场就是一个任务的定义,
  • 比如Runnable起了一个命令的意思,他的定义和运行就可以分开了,不像我们以前定义一个Thread,new一个Thread然后去重写它的Run方法.start才可以运行,或者以前就是你写了一个Runnable你也必须得new一个Thread出来,以前的这种定义和运行是固定的,是写死的就是你new一个Thread让他出来运行。有的同学他还是new一个Thread但是他有了各种各样新的玩法,不用你亲自去指定每一个Thread,他的运行的方式你可以自己去定义了,所以至于是怎么去定义的就看你怎么实现Executor的接口了,这里是定义和运行分开这么一个含义,所以这个接口体现的是这个意思,所以这个接口就比较简单,至于你是直接调用run还是new一个Thread那是你自己的事儿。

ExecutorService


ExecutorService又是什么意思呢,他是从Executor继承,另外,他除了去实现Executor可以去执行一个任务之外,他还完善了整个任务执行器的一个生命周期,就拿线程池来举例子,一个线程池里面一堆的线程就是一堆的工人,执行完一个任务之后我这个线程怎么结束啊,线程池定义了这样一些个方法:

void shutdown();//结束
List<Runnable> shutdownNow();//马上结束
boolean isShutdown();//是否结束了
boolean isTerminated();//是不是整体都执行完了
boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;//等着结束,等多长时间,时间到了还不结束的话他就返回false

-所以这里面呢,他是实现了一些个线程的线程池的生命周期的东西,扩展了Executor的接口,真正的线程池的现实是在ExecutorService的这个基础上来实现的。

submit()方法

submit提交任务: 把任务扔给这个线程池,什么时候运行由这个线程池来决定,相当于是异步的,

  • 当我们看到这个ExecutorService的时候你会发现他除了Executor执行任务之外还有submit提交任务,执行任务是直接拿过来马上运行
  • 而submit是扔给这个线程池,什么时候运行由这个线程池来决定,相当于是异步的,我只要往里面一扔就不管了。那好,如果不管的话什么时候他有结果啊,

Callable—接口

  • JDK 5.0新增

流程:

  1. 创建一个实现Callable的实现类
  2. 实现call方法,将此线程需要执行的操作声明在call()中
  3. 创建Callable接口实现类的对象
  4. 将此Callable接口实现类的对象作为传递到FutureTask构造器中,创建FutureTask的对象
  5. 将FutureTask的对象作为参数传递到Thread类的构造器中,创建Thread对象,并调用start()
  6. 获取Callable中call方法的返回值
    get()返回值即为FutureTask构造器参数Callable实现类重写的call()的返回值。

案例1:Callable的实现类

输出100以内的偶数,并返回他们的总和sum

//1.创建一个实现Callable的实现类
class NumThread implements Callable{
    //2.实现call方法,将此线程需要执行的操作声明在call()中
    @Override
    public Object call() throws Exception {
        int sum = 0;
        for (int i = 1; i <= 100; i++) {
            if(i % 2 == 0){
                System.out.println(i);
                sum += i;
            }
        }
        return sum;
    }
}


public class ThreadNew {
    public static void main(String[] args) {
        //3.创建Callable接口实现类的对象
        NumThread numThread = new NumThread();
        //4.将此Callable接口实现类的对象作为传递到FutureTask构造器中,创建FutureTask的对象
        FutureTask futureTask = new FutureTask(numThread);
        //5.将FutureTask的对象作为参数传递到Thread类的构造器中,创建Thread对象,并调用start()
        new Thread(futureTask).start();

        try {
            //6.获取Callable中call方法的返回值
            //get()返回值即为FutureTask构造器参数Callable实现类重写的call()的返回值。
            Object sum = futureTask.get();
            System.out.println("总和为:" + sum);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }

}

案例2: Callable实现类结合线程池

public class Test02 {
    public static void main(String[] args) throws Exception {
        ExecutorService service = Executors.newCachedThreadPool();
        Future<String> f = service.submit(new MyCall());
        String s = f.get();
        System.out.println(s);
        service.shutdown();
    }

}

class MyCall implements Callable<String> {
    @Override
    public String call() {
        System.out.println("Hello MyCall");
        return "success";
    }
}

如何理解实现Callable接口的方式比实现Runnable接口创建多线程方式强大?

  1. call()可以有返回值的。
  2. call()可以抛出异常,被外面的操作捕获,获取异常的信息
  3. Callable是支持泛型的

Future 和 FutureTask

  • 这个Future代表的是什么呢,这个Future代表的是那个Callable被执行完了之后我怎么才能拿到那个结果啊,它会封装到一个Future里面。Future将来,未来。未来你执行完之后可以把这个结果放到这个未来有可能执行完的结果里头,所以Future代表的是未来执行完的一个结果

FutureTask


  • 更灵活的一个用法是FutureTask,即是一个Future同时又是一个Task,原来这Callable只能一个Task只能是一个任务但是他不能作为一个Future来用。这个FutureTask相当于是我自己可以作为一个任务来用,同时这个任务完成之后的结果也存在于这个对象里,为什么他能做到这一点,因为FutureTask他实现了RunnableFuture,而RunnableFuture即实现了Runnable又实现了Future,所以他即是一个任务又是一个Future。所以这个FutureTask是更好用的一个类。大家记住这个类,后面还会有WorkStealingPool、ForkJoinPool这些个基本上是会用到FutureTask类的。
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;

public class T06_00_Future {
    public static void main(String[] args) throws InterruptedException, ExecutionException {

        FutureTask<Integer> task = new FutureTask<>(()->{
            TimeUnit.MILLISECONDS.sleep(500);
            return 1000;
        }); //new Callable () { Integer call();}

        new Thread(task).start();

        System.out.println(task.get()); //阻塞
    }
}

CompletableFuture

介绍一个CompletableFuture。他底层特别复杂,但是用法特别灵活,如果你们感兴趣可以去拓展的了解一下,用一下。CompletableFuture他的底层用的是ForkJoinPool

这个CompletableFuture非常的灵活,它内部有好多关于各种结果的一个组合,这个CompletableFuture是可以组合各种各样的不同的任务,

案例

1.需求:

假设你能够提供一个服务,这个服务查询各大电商网站同一类产品的价格并汇总展示

分析

假如你自己写了一个网站,这个网站都卖格力空调,同一个类型,然后很多人买东西都会进行一个价格比较,而你提供的这个服务就是我到淘宝上去查到这个格力空调买多少钱,然后我另启动一个线程去京东上找格力空调卖多少钱,在启动一个线程去拼多多上找,最后,我给你汇总一下这三个地方各售卖多少钱,然后你自己再来选去哪里买。下面代码,模拟了一个去别的地方取价格的一个方法,首先你去别的地方访问会花好长时间,因此我写了一个delay() 让他去随机的睡一段时间,表示我们要联网,我们要爬虫爬结果执行这个时间,然后打印了一下睡了多少时间之后才拿到结果的,如拿到天猫上的结果是1块钱,淘宝上结果是2块钱,京东上结果是3块钱,总而言之是经过网络爬虫爬过来的数据分析出来的多少钱。然后我们需要模拟一下怎么拿到怎么汇总.

单线程

  • 第一种写法就是我注释的这种写法,就是挨着牌的写,假设跑天猫跑了10秒,跑淘宝拍了10秒,跑京东跑了5秒,一共历时25秒才总出来。
public class T06_01_CompletableFuture02 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        long start, end;

        start = System.currentTimeMillis();
        priceOfTM();
        priceOfTB();
        priceOfJD();

        end = System.currentTimeMillis();
        System.out.println("use serial method call! " + (end - start));

    }

    private static double priceOfTM() {
        delay();
        return 1.00;
    }

    private static double priceOfTB() {
        delay();
        return 2.00;
    }

    private static double priceOfJD() {
        delay();
        return 3.00;
    }


    private static void delay() {
        int time = new Random().nextInt(500);
        try {
            TimeUnit.MILLISECONDS.sleep(time);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.printf("After %s sleep!\\n", time);
    }

}

多线程

如果我用不同的线程呢,一个一个的线程他们是并行的执行他们,你必须得等所有的线程都拿到之后才能产生一个结果

  • 但是用线程你写起来会有各种各样的麻烦事儿,比如说在去淘宝的过程中网络报错了该怎么办,你去京东的过程中正好赶上那天他活动,并发访问特别慢你又该怎么办,你必须得等所有的线程都拿到之后才能产生一个结果,如果想要做这件事儿的话与其是要你每一个都要写一个自己的线程,需要考虑到各种各样的延迟的问题,

CompletableFuture

有一个简单的写法,用一个CompletableFuture,首先第一点CompletableFuture他是一个Future,所以他会存一个将来有可能产生的结果值,结果值是一个Double,它会运行一个任务,然后这个任务最后产生一个结果,这个结果会存在CompletableFuture里面,结果的类型是Double。

  • 在这里我就定义了三个Future,分别代表了淘宝、京东、天猫,用了CompletableFuture的一个方法叫supplyAsync产生了一个异步的任务,这个异步的任务去天猫那边去给我拉数据去。你可以想象在一个线程池里面扔给他一个任务让他去执行,什么时候执行完了之后他的结果会返回到这个futureTM里面。但是总体的要求就是这些个所有的future都得结束才可以,才能展示我最后的结果。
  • 往下走还有这么一直写法,就是我把这三个future都可以扔给一个CompletableFuture让他去管理,他管理的时候可以调用allOf方法相当于这里面的所有的任务全部完成之后,最后join,你才能够继续往下运行。所以CompletableFuture除了提供了比较好用的对任务的管理之外,还提供了对于任务堆的管理,用于对一堆任务的管理。
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

public class T06_01_CompletableFuture {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        long start, end;


        start = System.currentTimeMillis();

        CompletableFuture<Double> futureTM = CompletableFuture.supplyAsync(()->priceOfTM());
        CompletableFuture<Double> futureTB = CompletableFuture.supplyAsync(()->priceOfTB());
        CompletableFuture<Double> futureJD = CompletableFuture.supplyAsync(()->priceOfJD());

        CompletableFuture.allOf(futureTM, futureTB, futureJD).join();

        end = System.currentTimeMillis();
        System.out.println("use completable future! " + (end - start));

    }

    private static double priceOfTM() {
        delay();
        return 1.00;
    }

    private static double priceOfTB() {
        delay();
        return 2.00;
    }

    private static double priceOfJD() {
        delay();
        return 3.00;
    }



    private static void delay() {
        int time = new Random().nextInt(500);
        try {
            TimeUnit.MILLISECONDS.sleep(time);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.printf("After %s sleep!\\n", time);
    }

}

CompletableFuture还提供了很多的写法,比如下面Lambda表达式的写法。


import java.io.IOException;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

public class T06_01_CompletableFuture02 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        long start, end;


        CompletableFuture.supplyAsync(()->priceOfTM())
                .thenApply(String::valueOf)
                .thenApply(str-> "price " + str)
                .thenAccept(System.out::println);

        try {
            System.in.read();
        } catch (IOException e) {
            e.printStackTrace();
        }

    }

    private static double priceOfTM() {
        delay();
        return 1.00;
    }


    private static void delay() {
        int time = new Random().nextInt(500);
        try {
            TimeUnit.MILLISECONDS.sleep(time);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.printf("After %s sleep!\\n", time);
    }

}


CompletableFuture是什么东西呢?他是各种任务的一种管理类,总而言之呢CompletableFuture是一个更高级的类,它能够在很高的一个层面上来帮助你管理一些个你想要的各种各样的任务,比如说你可以对任务进行各种各样的组合 ,所有任务完成之后你要执行一个什么样的结果,以及任何一个任务完成之后你要执行一个什么样的结果,还有他可以提供一个链式的处理方式Lambda的一些写法,拿到任务之后结果进行一个怎样的处理。

以上是关于提升--15---线程池基本概念CallableFutureCompletableFuture的主要内容,如果未能解决你的问题,请参考以下文章

线程池的基本概念

线程池的基本概念

C# 线程线程池Task概念+代码实践

Java线程池实现原理及其在美团业务中的实践

Java线程池实现原理及其在美团业务中的实践

Java线程池实现原理及其在美团业务中的实践