CompletableFuture进阶篇-外卖商家端API的异步化

Posted 热爱编程的大忽悠

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了CompletableFuture进阶篇-外卖商家端API的异步化相关的知识,希望对你有一定的参考价值。

CompletableFuture进阶篇-外卖商家端API的异步化


背景

随着订单量的持续上升,美团外卖各系统服务面临的压力也越来越大。作为外卖链路的核心环节,商家端提供了商家接单、配送等一系列核心功能,业务对系统吞吐量的要求也越来越高。而商家端API服务是流量入口,所有商家端流量都会由其调度、聚合,对外面向商家提供功能接口,对内调度各个下游服务获取数据进行聚合,具有鲜明的I/O密集型(I/O Bound)特点。在当前日订单规模已达千万级的情况下,使用同步加载方式的弊端逐渐显现,因此我们开始考虑将同步加载改为并行加载的可行性。


为何需要并行加载

外卖商家端API服务是典型的I/O密集型(I/O Bound)服务。除此之外,美团外卖商家端交易业务还有两个比较大的特点:

  • 服务端必须一次返回订单卡片所有内容:根据商家端和服务端的“增量同步协议注1”,服务端必须一次性返回订单的所有信息,包含订单主信息、商品、结算、配送、用户信息、骑手信息、餐损、退款、客服赔付(参照下面订单卡片截图)等,需要从下游三十多个服务中获取数据。在特定条件下,如第一次登录和长时间没登录的情况下,客户端会分页拉取多个订单,这样发起的远程调用会更多。
  • 商家端和服务端交互频繁:商家对订单状态变化敏感,多种推拉机制保证每次变更能够触达商家,导致App和服务端的交互频繁,每次变更需要拉取订单最新的全部内容。

在外卖交易链路如此大的流量下,为了保证商家的用户体验,保证接口的高性能,并行从下游获取数据就成为必然。


并行加载的实现方式

并行从下游获取数据,从IO模型上来讲分为同步模型和异步模型。

同步模型

从各个服务获取数据最常见的是同步调用,如下图所示:

在同步调用的场景下,接口耗时长、性能差,接口响应时长T > T1+T2+T3+……+Tn,这时为了缩短接口的响应时间,一般会使用线程池的方式并行获取数据,商家端订单卡片的组装正是使用了这种方式。


这种方式由于以下两个原因,导致资源利用率比较低:

  • CPU资源大量浪费在阻塞等待上,导致CPU资源利用率低。在Java 8之前,一般会通过回调的方式来减少阻塞,但是大量使用回调,又引发臭名昭著的回调地狱问题,导致代码可读性和可维护性大大降低。
  • 为了增加并发度,会引入更多额外的线程池,随着CPU调度线程数的增加,会导致更严重的资源争用,宝贵的CPU资源被损耗在上下文切换上,而且线程本身也会占用系统资源,且不能无限增加。

同步模型下,会导致硬件资源无法充分利用,系统吞吐量容易达到瓶颈。


NIO异步模型

我们主要通过以下两种方式来减少线程池的调度开销和阻塞时间:

  • 通过RPC NIO异步调用的方式可以降低线程数,从而降低调度(上下文切换)开销,如Dubbo的异步调用可以参考《dubbo调用端异步》一文。
  • 通过引入CompletableFuture(下文简称CF)对业务流程进行编排,降低依赖之间的阻塞。本文主要讲述CompletableFuture的使用和原理。

为什么会选择CompletableFuture?

我们首先对业界广泛流行的解决方案做了横向调研,主要包括Future、CompletableFuture注2、RxJava、Reactor。它们的特性对比如下:

FutureCompletableFutureRxJavaReactor
Composable(可组合)✔️✔️✔️
Asynchronous(异步)✔️✔️✔️✔️
Operator fusion(操作融合)✔️✔️
Lazy(延迟执行)✔️✔️
Backpressure(回压)✔️✔️
  • 可组合:可以将多个依赖操作通过不同的方式进行编排,例如CompletableFuture提供thenCompose、thenCombine等各种then开头的方法,这些方法就是对“可组合”特性的支持。
  • 操作融合:将数据流中使用的多个操作符以某种方式结合起来,进而降低开销(时间、内存)。
  • 延迟执行:操作不会立即执行,当收到明确指示时操作才会触发。例如Reactor只有当有订阅者订阅时,才会触发操作。
  • 回压:某些异步阶段的处理速度跟不上,直接失败会导致大量数据的丢失,对业务来说是不能接受的,这时需要反馈上游生产者降低调用量。

RxJava与Reactor显然更加强大,它们提供了更多的函数调用方式,支持更多特性,但同时也带来了更大的学习成本。而我们本次整合最需要的特性就是“异步”、“可组合”,综合考虑后,我们选择了学习成本相对较低的CompletableFuture。


CompletableFuture使用与原理

CompletableFuture的背景和定义

CompletableFuture解决的问题

CompletableFuture是由Java 8引入的,在Java8之前我们一般通过Future实现异步。

  • Future用于表示异步计算的结果,只能通过阻塞或者轮询的方式获取结果,而且不支持设置回调方法,Java 8之前若要设置回调一般会使用guava的ListenableFuture,回调的引入又会导致臭名昭著的回调地狱(下面的例子会通过ListenableFuture的使用来具体进行展示)。
  • CompletableFuture对Future进行了扩展,可以通过设置回调的方式处理计算结果,同时也支持组合操作,支持进一步的编排,同时一定程度解决了回调地狱的问题。

下面将举例来说明,我们通过ListenableFuture、CompletableFuture来实现异步的差异。假设有三个操作step1、step2、step3存在依赖关系,其中step3的执行依赖step1和step2的结果。

Future(ListenableFuture)的实现(回调地狱)如下:

ExecutorService executor = Executors.newFixedThreadPool(5);
ListeningExecutorService guavaExecutor = MoreExecutors.listeningDecorator(executor);
ListenableFuture<String> future1 = guavaExecutor.submit(() -> 
    //step 1
    System.out.println("执行step 1");
    return "step1 result";
);
ListenableFuture<String> future2 = guavaExecutor.submit(() -> 
    //step 2
    System.out.println("执行step 2");
    return "step2 result";
);
ListenableFuture<List<String>> future1And2 = Futures.allAsList(future1, future2);
Futures.addCallback(future1And2, new FutureCallback<List<String>>() 
    @Override
    public void onSuccess(List<String> result) 
        System.out.println(result);
        ListenableFuture<String> future3 = guavaExecutor.submit(() -> 
            System.out.println("执行step 3");
            return "step3 result";
        );
        Futures.addCallback(future3, new FutureCallback<String>() 
            @Override
            public void onSuccess(String result) 
                System.out.println(result);
                    
            @Override
            public void onFailure(Throwable t) 
            
        , guavaExecutor);
    

    @Override
    public void onFailure(Throwable t) 
    , guavaExecutor);

CompletableFuture的实现如下:

ExecutorService executor = Executors.newFixedThreadPool(5);

CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> 
    System.out.println("执行step 1");
    return "step1 result";
, executor);

CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> 
    System.out.println("执行step 2");
    return "step2 result";
);

cf1.thenCombine(cf2, (result1, result2) -> 
    System.out.println(result1 + " , " + result2);
    System.out.println("执行step 3");
    return "step3 result";
).thenAccept(result3 -> System.out.println(result3));

显然,CompletableFuture的实现更为简洁,可读性更好。


CompletableFuture的定义


CompletableFuture实现了两个接口(如上图所示):Future、CompletionStage。

Future表示异步计算的结果。

Future的实现原理可以看这篇文章

CompletionStage用于表示异步执行过程中的一个步骤(Stage),这个步骤可能是由另外一个CompletionStage触发的,随着当前步骤的完成,也可能会触发其他一系列CompletionStage的执行。从而我们可以根据实际业务对这些步骤进行多样化的编排组合,CompletionStage接口正是定义了这样的能力,我们可以通过其提供的thenAppy、thenCompose等函数式编程方法来组合编排这些步骤。


CompletableFuture的使用

下面我们通过一个例子来讲解CompletableFuture如何使用,使用CompletableFuture也是构建依赖树的过程。一个CompletableFuture的完成会触发另外一系列依赖它的CompletableFuture的执行:


如上图所示,这里描绘的是一个业务接口的流程,其中包括CF1\\CF2\\CF3\\CF4\\CF5共5个步骤,并描绘了这些步骤之间的依赖关系,每个步骤可以是一次RPC调用、一次数据库操作或者是一次本地方法调用等,在使用CompletableFuture进行异步化编程时,图中的每个步骤都会产生一个CompletableFuture对象,最终结果也会用一个CompletableFuture来进行表示。

根据CompletableFuture依赖数量,可以分为以下几类:零依赖、一元依赖、二元依赖和多元依赖。


零依赖:CompletableFuture的创建

我们先看下如何不依赖其他CompletableFuture来创建新的CompletableFuture:


如上图红色链路所示,接口接收到请求后,首先发起两个异步调用CF1、CF2,主要有三种方式:

ExecutorService executor = Executors.newFixedThreadPool(5);
//1、使用runAsync或supplyAsync发起异步调用
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> 
  return "result1";
, executor);
//2、CompletableFuture.completedFuture()直接创建一个已完成状态的CompletableFuture
CompletableFuture<String> cf2 = CompletableFuture.completedFuture("result2");
//3、先初始化一个未完成的CompletableFuture,然后通过complete()、completeExceptionally(),完成该CompletableFuture
CompletableFuture<String> cf = new CompletableFuture<>();
cf.complete("success");

第三种方式的一个典型使用场景,就是将回调方法转为CompletableFuture,然后再依赖CompletableFure的能力进行调用编排,示例如下:

@FunctionalInterface
public interface ThriftAsyncCall 
    void invoke() throws TException;

 /**
  * 该方法为美团内部rpc注册监听的封装,可以作为其他实现的参照
  * OctoThriftCallback 为thrift回调方法
  * ThriftAsyncCall 为自定义函数,用来表示一次thrift调用(定义如上)
  */
  public static <T> CompletableFuture<T> toCompletableFuture(final OctoThriftCallback<?,T> callback , ThriftAsyncCall thriftCall) 
   //新建一个未完成的CompletableFuture
   CompletableFuture<T> resultFuture = new CompletableFuture<>();
   //监听回调的完成,并且与CompletableFuture同步状态
   callback.addObserver(new OctoObserver<T>() 
       @Override
       public void onSuccess(T t) 
           resultFuture.complete(t);
       
       @Override
       public void onFailure(Throwable throwable) 
           resultFuture.completeExceptionally(throwable);
       
   );
   if (thriftCall != null) 
       try 
           thriftCall.invoke();
        catch (TException e) 
           resultFuture.completeExceptionally(e);
       
   
   return resultFuture;
  

一元依赖:依赖一个CF


如上图红色链路所示,CF3,CF5分别依赖于CF1和CF2,这种对于单个CompletableFuture的依赖可以通过thenApply、thenAccept、thenCompose等方法来实现,代码如下所示:

CompletableFuture<String> cf3 = cf1.thenApply(result1 -> 
  //result1为CF1的结果
  //......
  return "result3";
);
CompletableFuture<String> cf5 = cf2.thenApply(result2 -> 
  //result2为CF2的结果
  //......
  return "result5";
);

二元依赖:依赖两个CF

如上图红色链路所示,CF4同时依赖于两个CF1和CF2,这种二元依赖可以通过thenCombine等回调来实现,如下代码所示:

CompletableFuture<String> cf4 = cf1.thenCombine(cf2, (result1, result2) -> 
  //result1和result2分别为cf1和cf2的结果
  return "result4";
);

多元依赖:依赖多个CF


如上图红色链路所示,整个流程的结束依赖于三个步骤CF3、CF4、CF5,这种多元依赖可以通过allOf或anyOf方法来实现,区别是当需要多个依赖全部完成时使用allOf,当多个依赖中的任意一个完成即可时使用anyOf,如下代码所示:

CompletableFuture<Void> cf6 = CompletableFuture.allOf(cf3, cf4, cf5);
CompletableFuture<String> result = cf6.thenApply(v -> 
  //这里的join并不会阻塞,因为传给thenApply的函数是在CF3、CF4、CF5全部完成时,才会执行 。
  result3 = cf3.join();
  result4 = cf4.join();
  result5 = cf5.join();
  //根据result3、result4、result5组装最终result;
  return "result";
);

CompletableFuture原理

CompletableFuture中包含两个字段:resultstack。result用于存储当前CF的结果,stack(Completion)表示当前CF完成后需要触发的依赖动作(Dependency Actions),去触发依赖它的CF的计算,依赖动作可以有多个(表示有多个依赖它的CF),以栈(Treiber stack)的形式存储,stack表示栈顶元素。

    volatile Object result;       // Either the result or boxed AltResult
    volatile Completion stack;    // Top of Treiber stack of dependent actions

这种方式类似“观察者模式”,依赖动作(Dependency Action)都封装在一个单独Completion子类中。下面是Completion类关系结构图。CompletableFuture中的每个方法都对应了图中的一个Completion的子类,Completion本身是观察者的基类。

  • UniCompletion继承了Completion,是一元依赖的基类,例如thenApply的实现类UniApply就继承自UniCompletion。
  • BiCompletion继承了UniCompletion,是二元依赖的基类,同时也是多元依赖的基类。例如thenCombine的实现类BiRelay就继承自BiCompletion。


CompletableFuture的设计思想

按照类似“观察者模式”的设计思想,原理分析可以从“观察者”和“被观察者”两个方面着手。由于回调种类多,但结构差异不大,所以这里单以一元依赖中的thenApply为例,不再枚举全部回调类型。如下图所示:

被观察者

  1. 每个CompletableFuture都可以被看作一个被观察者,其内部有一个Completion类型的链表成员变量stack,用来存储注册到其中的所有观察者。当被观察者执行完成后会弹栈stack属性,依次通知注册到其中的观察者。上面例子中步骤fn2就是作为观察者被封装在UniApply中。
  2. 被观察者CF中的result属性,用来存储返回结果数据。这里可能是一次RPC调用的返回值,也可能是任意对象,在上面的例子中对应步骤fn1的执行结果。

观察者

CompletableFuture支持很多回调方法,例如thenAccept、thenApply、exceptionally等,这些方法接收一个函数类型的参数f,生成一个Completion类型的对象(即观察者),并将入参函数f赋值给Completion的成员变量fn,然后检查当前CF是否已处于完成状态(即result != null),如果已完成直接触发fn,否则将观察者Completion加入到CF的观察者链stack中,再次尝试触发,如果被观察者未执行完则其执行完毕之后通知触发。

  1. 观察者中的dep属性:指向其对应的CompletableFuture,在上面的例子中dep指向CF2。
  2. 观察者中的src属性:指向其依赖的CompletableFuture,在上面的例子中src指向CF1。
  3. 观察者Completion中的fn属性:用来存储具体的等待被回调的函数。这里需要注意的是不同的回调方法(thenAccept、thenApply、exceptionally等)接收的函数类型也不同,即fn的类型有很多种,在上面的例子中fn指向fn2。

Completion类中属性定义如下:

    abstract static class Completion extends ForkJoinTask<Void>
        implements Runnable, AsynchronousCompletionTask 
        //观察者链指针
        volatile Completion next;   
        ...

UniCompletion类中属性定义如下:

abstract static class UniCompletion<T,V> extends Completion    
        Executor executor;                 // executor to use (null if none)
        CompletableFuture<V> dep;          // the dependent to complete
        CompletableFuture<T> src;          // source for action
        ...

整体流程

一元依赖

这里仍然以thenApply为例来说明一元依赖的流程:

  • 将观察者Completion注册到CF1,此时CF1将Completion压栈。
  • 当CF1的操作运行完成时,会将结果赋值给CF1中的result属性。
  • 依次弹栈,通知观察者尝试运行。


源码领读:

  • 压栈过程
    public <U> CompletableFuture<U> thenApply(
        Function<? super T,? extends U> fn) 
        //下面要执行的步骤是uniApplyStage的步骤,并且该接口并无提供指定在哪个线程池执行的能力,因此第一个参数默认为null
        return uniApplyStage(null, fn);
    

    private <V> CompletableFuture<V> uniApplyStage(
        Executor e, Function<? super T,? extends V> f) 
        //传入要执行的函数式接口不能为null
        if (f == null) throw new NullPointerException();
        //thenApply需要等待当前CompletableFuture的结果作为参数传给函数式接口
        Object r;
        //如果此时当前CompletableFuture的result属性不为空
        //说明当前CompletableFuture已经有了结果
        if ((r = result) != null)
            //直接启动接下来的thenApply步骤即可
            return uniApplyNow(r, e, f);
        //new一个新的CompletableFuture---用来作为当前thenApply步骤对应的CompletableFuture    
        CompletableFuture<V> d = newIncompleteFuture();
        //构建一个UniApply,参数: 指定的线程池(可以为null),观察者中的dep属性,观察者中的src属性,观察者Completion中的fn属性
        //观察者入栈
        unipush(new UniApply<T,V>(e, d, this, f));
        //返回thenApply对应的CompletableFuture    
        return d;
    

    //该方法执行说明当前CompletableFuture内部任务已经执行结束了
    //r是依赖CompletableFuture的执行结果,e是线程池,f是thenApply传入要执行的任务
    private <V> CompletableFuture<V> uniApplyNow(
        Object r, Executor e, Function<? super T,? extends V> f) 
        Throwable x;
        //new一个新的CompletableFuture
        CompletableFuture<V> d = newIncompleteFuture();
        //依赖的CompletableFuture执行是否出现了异常
        if (r instanceof AltResult) 
            if ((x = ((AltResult)r).ex) != null) 
                d.result = encodeThrowable(x, r);
                return d;
            
            r = null;
        
        try 
            //如果用户指定了线程池
            if (e != null) 
                //用指定的线程池来执行任务
                e.execute(new UniApply<T,V>(null, d, this, f));
             else 
                //此时thenApply方法要执行的任务是在调用thenApply方法的线程内执行的
                @SuppressWarnings("unchecked") T t = (T) r;
                d.result = d.encodeValue(f.apply(t));
            
         以上是关于CompletableFuture进阶篇-外卖商家端API的异步化的主要内容,如果未能解决你的问题,请参考以下文章

Spring在多线程环境下如何确保事务一致性

Spring在多线程环境下如何确保事务一致性

javaCompletableFuture原理与实践-外卖商家端API的异步化

CompletableFuture进阶

订餐系统之同步口碑外卖商家菜单与点点送订单

美团外卖商家获取订单-signToken取值