多线程之CompletableFuture全方面解析

Posted 圆圆的球

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了多线程之CompletableFuture全方面解析相关的知识,希望对你有一定的参考价值。

多线程之CompletableFuture全方面解析


目录:

前言:

​ 在我们写的一个底层Api中,由于上亿级数据量情况下,查询效率较慢,本篇文档记录一下优化过程所用技术栈CompletableFuture的具体使用方法。整体优化思路在并发大数据量情况下如下:

  • 有事务:因为有事务影响,所以使用多线程可能会对先改后查这种情况的数据准确性无法进行保证,所以有以下几种情况(表大概50个字段):

    • select * from user a where a.id in(......)
      
    • select * from user a where (a.id in(......)) or (a.id in(......))
      
    • SELECT * FROM user where a.id in (...)
      UNION ALL
      SELECT * FROM user where a.id in (...)
      

    具体测试图就不贴了,因为不是重点,在50万数据存量十并发下一次性查2万数据的十个字段 or in 是最快的 整体执行(加上业务逻辑)下来不到3.5s。

  • 无事务:采用多线程,条件和上面的一样执行下来0.7s,一开始自己想的就是根据数据量使用线程池每1000条创建一个线程(new Thread的方式),后来发现写起来怪麻烦的,经过部门大佬指点发现有一种写法很简单,使用CompletableFuture进行多线程的处理,这也是今天想要总结的重点!!!!!!!!!

最佳线程数:

​ 在说CompletableFuture之前,我想先大概说一下最佳线程数这个,因为创建线程和销毁线程都是比较耗时的操作,频繁的创建和销毁线程会浪费很多CPU的资源。此外,如果每个任务都创建一个线程去处理,这样线程会越来越多。我们知道每个线程默认情况下占1M的内存空间,如果线程非常多,内存资源将会被耗尽。这时,我们需要线程池去管理线程,不会出现内存资源被耗尽的情况,也不会出现频繁创建和销毁线程的情况,因为它内部是可以复用线程的。

​ 那么 我们怎么样去确定最佳线程数呢?(后面会有公式滴~先看基本)

​ 我们在使用spring cloud时,通常会使用ThreadPoolExecutor去设置线程池的参数,那我们就先总结一下ThreadPoolExecutor

一、ThreadPoolExecutor的重要参数

  • corePoolSize:核心线程数

    • 核心线程会一直存活,及时没有任务需要执行

    • 当线程数小于核心线程数时,即使有线程空闲,线程池也会优先创建新线程处理

    • 设置allowCoreThreadTimeout=true(默认false)时,核心线程会超时关闭

  • queueCapacity:任务队列容量(阻塞队列)

    • 当核心线程数达到最大时,新任务会放在队列中排队等待执行
  • maxPoolSize:最大线程数

    • 当线程数>=corePoolSize,且任务队列已满时。线程池会创建新线程来处理任务

    • 当线程数=maxPoolSize,且任务队列已满时,线程池会拒绝处理任务而抛出异常

  • keepAliveTime:线程空闲时间

    • 当线程空闲时间达到keepAliveTime时,线程会退出,直到线程数量等于corePoolSize

    • 如果allowCoreThreadTimeout=true,则会直到线程数量=0

  • allowCoreThreadTimeout:允许核心线程超时

  • rejectedExecutionHandler:任务拒绝处理器

    • 两种情况会拒绝处理任务:

      • 当线程数已经达到maxPoolSize,切队列已满,会拒绝新任务
      • 当线程池被调用shutdown()后,会等待线程池里的任务执行完毕,再shutdown。如果在调用shutdown()和线程池真正shutdown之间提交任务,会拒绝新任务
    • 线程池会调用rejectedExecutionHandler来处理这个任务。如果没有设置默认是AbortPolicy,会抛出异常

    • ThreadPoolExecutor类有几个内部实现类来处理这类情况:

      • AbortPolicy 丢弃任务,抛运行时异常
      • CallerRunsPolicy 执行任务
      • DiscardPolicy 忽视,什么都不会发生
      • DiscardOldestPolicy 从队列中踢出最先进入队列(最后一个执行)的任务
    • 实现RejectedExecutionHandler接口,可自定义处理器

二、ThreadPoolExecutor执行顺序:

线程池按以下行为执行任务

  1. 当线程数小于核心线程数时,创建线程。
  2. 当线程数大于等于核心线程数,且任务队列未满时,将任务放入任务队列。
  3. 当线程数大于等于核心线程数,且任务队列已满
    1. 若线程数小于最大线程数,创建线程
    2. 若线程数等于最大线程数,抛出异常,拒绝任务

三、如何设置参数

  • 默认值

    - corePoolSize=1
    - queueCapacity=Integer.MAX_VALUE
    - maxPoolSize=Integer.MAX_VALUE
    - keepAliveTime=60s
    - allowCoreThreadTimeout=false
    - rejectedExecutionHandler=AbortPolicy()
    
  • 如何来设置

    • 需要根据几个值来决定
      • tasks :每秒的任务数,假设为500~1000
      • taskcost:每个任务花费时间,假设为0.1s
      • responsetime:系统允许容忍的最大响应时间,假设为1s
    • 做几个计算
      • corePoolSize = 每秒需要多少个线程处理?
        • threadcount = tasks/(1/taskcost) =tasks*taskcost= (500~1000)*0.1 = 50~100 个线程。corePoolSize设置应该大于50
        • 根据8020原则,如果80%的每秒任务数小于800,那么corePoolSize设置为80即可
      • queueCapacity = (coreSizePool/taskcost)*responsetime
        • 计算可得 queueCapacity = 80/0.1*1 = 80。意思是队列里的线程可以等待1s,超过了的需要新开线程来执行
        • 切记不能设置为Integer.MAX_VALUE,这样队列会很大,线程数只会保持在corePoolSize大小,当任务陡增时,不能新开线程来执行,响应时间会随之陡增。
      • maxPoolSize = (max(tasks)- queueCapacity)/(1/taskcost)
        • 计算可得 maxPoolSize = (1000-80)/10 = 92
        • (最大任务数-队列容量)/每个线程每秒处理能力 = 最大线程数
      • rejectedExecutionHandler:根据具体情况来决定,任务不重要可丢弃,任务重要则要利用一些缓冲机制来处理
      • keepAliveTimeallowCoreThreadTimeout采用默认通常能满足
  • 以上都是理想值,实际情况下要根据机器性能来决定。如果在未达到最大线程数的情况机器cpu load已经满了,则需要通过升级硬件(呵呵)和优化代码,降低taskcost来处理。

举个栗子:

Future

​ 因为在CompletableFuture之前我们一直用的是Future来实现异步操作,并且CompletableFuture实现了Future的接口,我们先浅看一下Future:

​ Future提供了一种异步并行计算的功能,如果主线程需要执行一个很耗时的计算任务,我们可以通过future把这个任务放在异步线程去执行,主线程则去执行其他任务,处理完后,再用Future获取结果。

举个栗子:

​ 假设我们有两个任务服务,一个查询用户基本信息,一个查询用户住宅信息。

public class UserInfoService 

    public UserInfo getUserInfo(Long userId) throws InterruptedException 
        Thread.sleep(300);//模拟调用耗时
        return new UserInfo("666", "伊利蛋", 27); //一般是查数据库,或者远程调用返回的
    


public class HouseService 

    public HouseInfo getHouseInfo(long userId) throws InterruptedException 
        Thread.sleep(500); //模拟调用耗时
        return new HouseInfo("666", "艾泽拉拉小区");
    

​ 接下来,我们来演示下,在主线程中是如何使用Future来进行异步调用的。

public class FutureTest 

    public static void main(String[] args) throws ExecutionException, InterruptedException 

        ExecutorService executorService = Executors.newFixedThreadPool(10);

        UserInfoService userInfoService = new UserInfoService();
        HouseService houseService = new HouseService();
        long userId =666L;
        long startTime = System.currentTimeMillis();

        //调用用户服务获取用户基本信息
        FutureTask<UserInfo> userInfoFutureTask = new FutureTask<>(new Callable<UserInfo>() 
            @Override
            public UserInfo call() throws Exception 
                return userInfoService.getUserInfo(userId);
            
        );
        executorService.submit(userInfoFutureTask);

        Thread.sleep(300); //模拟主线程其它操作耗时

        FutureTask<HouseInfo> houseInfoFutureTask = new FutureTask<>(new Callable<HouseInfo>() 
            @Override
            public HouseInfo call() throws Exception 
                return houseService.getHouseInfo(userId);
            
        );
        executorService.submit(houseInfoFutureTask);

        UserInfo userInfo = userInfoFutureTask.get();//获取个人信息结果
        HouseInfo houseInfo = houseInfoFutureTask.get();//获取住宅信息结果

        System.out.println("总共用时" + (System.currentTimeMillis() - startTime) + "ms");
    

    
-------------------------------------------------------------------------
运行结果 : 总共用时806ms

​ 如果我们不使用Future进行异步调用,而是在主线程中串行进行的话,耗时大概为1100ms,所以future对程序执行效率提升效果还是可以的。

​ 深入看一下future对结果的获取,发现它对于结果的获取并不是很友好,只能通过阻塞或者轮询的方式得到任务的结果(源码可以搜一哈,网上很多):

  • Future.get() 就是阻塞调用,在线程获取结果之前get方法会一直阻塞
  • Future提供了一个isDone方法,可以在程序中轮询这个方法查询执行结果。

​ 这个时候就发现问题了,阻塞的这种方式和我们异步编程的设计理念不一样啊,难搞,轮询的话,又会浪费无谓的CPU资源,那就没有一种更好的方式么???当然我都能发现,大佬那么多,人家早都发现了,在JDK8的时候推出了,也就是我们本次要总结的重点CompletableFuture,为什么他就能解决呢?

​ 因为CompletableFuture提供了一种观察者模式类似的机制,可以让任务执行完成后通知监听的一方。

CompletableFuture

​ 首先我们还是用刚才future那个栗子,不过这次选择CompletableFuture 来进行实现:

public class FutureTest 

    public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException 

        UserInfoService userInfoService = new UserInfoService();
        HouseService houseService = new HouseService();
        long userId =666L;
        long startTime = System.currentTimeMillis();

        //调用用户服务获取用户基本信息
        CompletableFuture<UserInfo> completableUserInfoFuture = CompletableFuture.supplyAsync(() -> userInfoService.getUserInfo(userId));

        CompletableFuture<HouseInfo> completableHouseInfoFuture = CompletableFuture.supplyAsync(() -> houseService.getHouseInfo(userId)); 
        
        Thread.sleep(300); //模拟主线程其它操作耗时

        UserInfo userInfo = completableUserInfoFuture.get(2,TimeUnit.SECONDS);//获取个人信息结果
        HouseInfo houseInfo = completableHouseInfoFuture.get();//获取住宅信息结果
        System.out.println("总共用时" + (System.currentTimeMillis() - startTime) + "ms");

    
    

-------------------------------------------------------------------------
运行结果 : 总共用时800ms

​ 可以发现,使用CompletableFuture,代码确实简洁了很多。CompletableFuturesupplyAsync方法,提供了异步执行的功能,线程池也不用单独创建了。

​ 接下来我们可以一起总结一下CompletableFuture的具体使用。

CompletableFuture的三个使用场景:

  • 创建异步任务
  • 简单任务异步回调
  • 多任务组合处理
创建异步任务:

CompletableFuture创建异步任务,一般有supplyAsyncrunAsync两个方法:

  • supplyAsync执行CompletableFuture任务,支持返回值。
  • runAsync执行CompletableFuture任务,没有返回值。
supplyAsync方法:
//使用默认内置线程池ForkJoinPool.commonPool(),根据supplier构建执行任务
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
//自定义线程,根据supplier构建执行任务
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
runAsync方法:
//使用默认内置线程池ForkJoinPool.commonPool(),根据runnable构建执行任务
public static CompletableFuture<Void> runAsync(Runnable runnable) 
//自定义线程,根据runnable构建执行任务
public static CompletableFuture<Void> runAsync(Runnable runnable,  Executor executor)

举个栗子:

public class FutureTest 

    public static void main(String[] args) 
        //可以自定义线程池
        ExecutorService executor = Executors.newCachedThreadPool();
        //runAsync的使用
        CompletableFuture<Void> runFuture = CompletableFuture.runAsync(() -> System.out.println("runAsync,为了部落"), executor);
        //supplyAsync的使用
        CompletableFuture<String> supplyFuture = CompletableFuture.supplyAsync(() -> 
                    System.out.print("supplyAsync,为了联盟");
                    return "哈哈哈哈哈"; , executor);
        //runAsync的future没有返回值,输出null
        System.out.println(runFuture.join());
        //supplyAsync的future,有返回值
        System.out.println(supplyFuture.join());
        executor.shutdown(); // 线程池需要关闭
    

-------------------------------------------------------------------------
//输出
runAsync,为了部落
null
supplyAsync,为了联盟哈哈哈哈哈
任务异步回调:

任务异步回调大概有六种使用场景:

1.thenRun/thenRunAsync
public CompletableFuture<Void> thenRun(Runnable action);
public CompletableFuture<Void> thenRunAsync(Runnable action);

CompletableFuturethenRun方法,通俗点讲就是,做完第一个任务后,再做第二个任务。某个任务执行完成后,执行回调方法;但是前后两个任务没有参数传递,第二个任务也没有返回值,举个例子:

public class FutureThenRunTest 

    public static void main(String[] args) throws ExecutionException, InterruptedException 

        CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync(
                ()->
                    System.out.println("先打BOSS");
                    return "为了圣光";
                
        );

        CompletableFuture thenRunFuture = orgFuture.thenRun(() -> 
            System.out.println("再领奖励");
        );

        System.out.println(thenRunFuture.get());
    

//输出
先打BOSS
再领奖励
null

咱们可以通过源码看看thenRunthenRunAsync的区别:

(嘀嘀嘀:后面介绍的thenAcceptthenAcceptAsyncthenApplythenApplyAsync等区别和这个相同)

   private static final Executor asyncPool = useCommonPool ?
        ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
        
    public CompletableFuture<Void> thenRun(Runnable action) 
        return uniRunStage(null, action);
    

    public CompletableFuture<Void> thenRunAsync(Runnable action) 
        return uniRunStage(asyncPool, action);
    

如果你执行第一个任务的时候,传入了一个自定义线程池:

  • 调用thenRun方法执行第二个任务时,则第二个任务和第一个任务是共用同一个线程池
  • 调用thenRunAsync执行第二个任务时,则第一个任务使用的是你自己传入的线程池,第二个任务使用的是ForkJoin线程池。
2.thenAccept/thenAcceptAsync

CompletableFuturethenAccept方法表示,第一个任务执行完成后,执行第二个回调方法任务,会将该任务的执行结果,作为入参,传递到回调方法中,但是回调方法是没有返回值的。

public class FutureThenAcceptTest 

    public static void main(String[] args) throws ExecutionException, InterruptedException 

        CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync(
                ()->
                    System.out.println("第一个CompletableFuture方法任务");
                    return "为了圣光";
                
        );

        CompletableFuture thenAcceptFuture = orgFuture.thenAccept((a) -> 
            if ("为了圣光".equals(a)) 
                System.out.println("牛的");
            

            System.out.println("oops");
        );

        System.out.println(thenAcceptFuture.get());
    

//输出
第一个CompletableFuture方法任务
牛的
oops
null
3.thenApply/thenApplyAsync

CompletableFuturethenApply方法表示,第一个任务执行完成后,执行第二个回调方法任务,会将该任务的执行结果,作为入参,传递到回调方法中,并且回调方法是有返回值的。

public class FutureThenApplyTest 

    public static void main(String[] args) throws ExecutionException, InterruptedException 

        CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync(
                ()->
                    System.out.println("第一个CompletableFuture方法任务");
                    return "为了圣光";
                
        );

        CompletableFuture<String> thenApplyFuture = orgFuture.thenApply((a) -> 
            if ("为了圣光".equals(a)) 
                return "oops";
            

            return "先考虑考虑";
        );

        System.out.println(thenApplyFuture.get())以上是关于多线程之CompletableFuture全方面解析的主要内容,如果未能解决你的问题,请参考以下文章

多线程之CompletableFuture全方面解析

JUC - 多线程之ForkJoin;异步调用CompletableFuture

JUC并发编程之CompletableFuture基础用法

JUC并发编程之CompletableFuture基础用法

JUC并发编程之CompletableFuture基础用法

Java8 CompletableFuture 用法全解