异步架构和响应式(Reactive)编程

Posted 克里斯朵夫李维

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了异步架构和响应式(Reactive)编程相关的知识,希望对你有一定的参考价值。

导言

随着互联网的发展,用户的规模不断扩大, 应用的架构从单一应用架构演变到微服务架构。在微服务架构下,我们把一些核心的业抽取出来,作为独立的服务。服务与服务之间通过RPC或者HTTP进行通信,这意味着微服务架通常是I/O密集型的。所以这也决定了现代互联网架构的性能瓶颈:I/O。

阻塞架构

阻塞式系统构建于Servlet框架上(Servert 3.0之前),这样的系统是阻塞的和多线程的,这意味着每个连接使用一个线程来处理请求,I/O操作是通过从线程池中选择一个工作线程来执行I/O,并且阻塞请求线程,直到工作线程完成为止。如下图所示

具体的处理步骤:

  1. 客户端通过Http协议发起一次请求
  2. Tomcat创建一个线程处理请求
  3. 当应用A调用应用B的时候,线程将会被阻塞,释放CPU的使用权。
  4. 当应用B返回的时候线程被唤醒,线程将有机会获取CPU时间片。

在上述场景的第3步中,因为网络I/O,CPU将有一段时间被空闲,在高并发场景下,我们很容易想到通过并发(使用多线程)来提高CPU的利用率。

现在多线程似乎已经成为了解决阻塞式系统并发问题的标准模式。通常我们使用阻塞代码来编写程序,当出现性能瓶颈的时候,我们可以引入额外的线程来运行阻塞的代码。但是并行化的处理方式并不是万能的,主要有以下几个方面的问题

  1. 线程本身就是比较昂贵的资源,创建过多的线程会占用更多的内存。
  2. 线程上下文切换比较耗时并且会增加cpu的负载。
  3. 需要解决线程间的同步问题。

阻塞式系统缺乏弹性。在高并发场景下,如果应用B延迟增加或由于错误导致应用A重试,那么应用A连接和线程数量就会不断增加。当这种情况发生时,应用A的线程数量激增会导致服务器负载激增并使集群不堪重负。为了解决这种问题,我们需要限制限制创建线程的数量,引入熔断和限流机制(Hystrix、Sentinel)帮助我们在发生了这些事情期间保持阻塞系统的稳定性。

异步架构

在高并发场景下,另一种提高CPU利用率的方法是异步化。我们可以编写异步的,非阻塞的代码。在执行IO操作的时候,让线程继续拥有CPU的时间片同时切换到另一个任务上,当异步任务执行完成时,再返回当前的线程继续执行。异步系统通常每个CPU核心使用一个线程处理所有的请求和响应,因为并不是每个请求都需要创建一个线程,所以连接成本很便宜。它的成本只是文件描述符和监听器(例如epoll)。而阻塞模型中的连接成本是线程,并且具有大量内存和系统开销。由于数据保留在同一CPU上,从而可以更好地利用CPU级别的缓存,并且只有较少的上下文切换,因此可以提高效率。

例如,网络I/O需要耗时3s,其他任务需要2s。在阻塞式系统中,整个请求完成需要5s,如果在5s内有1000个请求,在不使用线程池的情况下将会创建1000个线程。而异步系统中整个请求完成只需要3s。这意味着在异步系统中我们可以使用更少的线程与阻塞式系统达到同样的效果。

为什么要使用响应式编程

在JVM上提供了两种异步编程模型:

Callbacks:异步方法没有返回值,但是需要一个额外的回调参数(lambda或者匿名函数),当结果可用时调用这个参数。一个著名的例子是Swing’s的EventListener层次结构。

Futures:异步方法立即返回一个Future。异步进程计算T值,通过Future对象包装对T值的访问。该值不是立即可用的,可以轮询该对象,直到该值可用为止。例如,ExecutorService使用Future对象,运行Callable<T任务。

但是这两种技术都有他们的局限性,回调难以组合在一起,很快就会导致代码难以阅读和维护(这种情况称为h"Callback Hell")

一个回调地狱(Callback Hell)的例子:

需求:通过用户ID获取获取用户的收藏夹,并将前五个收藏夹的信息展示到用户的UI上,如果收藏夹为空,则展示5个建议。


userService.getFavorites(userId, new Callback<List<String>>() { // 1
  public void onSuccess(List<String> list) // 2 
    if (list.isEmpty()) { // 3
      suggestionService.getSuggestions(new Callback<List<Favorite>>() {
        public void onSuccess(List<Favorite> list) // 4 
          UiUtils.submitOnUiThread(() -> { // 5
            list.stream()
                .limit(5)
                .forEach(uiList::show); // 6 
            });
        }

        public void onError(Throwable error) // 7
          UiUtils.errorPopup(error);
        }
      });
    } else {
      list.stream() // 8
          .limit(5)
          .forEach(favId -> favoriteService.getDetails(favId, // 9
            new Callback<Favorite>() {
              public void onSuccess(Favorite details) {
                UiUtils.submitOnUiThread(() -> uiList.show(details));
              }

              public void onError(Throwable error) {
                UiUtils.errorPopup(error);
              }
            }
          ));
    }
  }

  public void onError(Throwable error) {
    UiUtils.errorPopup(error);
  }
  1. 我们有基于回调的服务:一个回调接口,在异步流程成功时调用 onSuccess方法,在发生错误时调用 onError方法。
  2. 第一个服务通过回调得到收藏夹ID列表。
  3. 如果收藏夹为空,则需要调用 suggestionService
  4. suggestionService在第二次回调中提供一个 List<Favorite>
  5. 切换到UI线程。
  6. 我们使用Java 8流将处理的建议数量限制为5个,并在UI的图形化列表中显示它们。
  7. 在每一层,我们都以相同的方式处理错误:在弹出窗口中显示错误。
  8. 如果该服务返回了收藏夹ID列表,我们把收藏夹ID列表限制为5个,然后转到 favoriteService获取详细的 Favorite对象。
  9. 再一次通过回调将 Favorite提交到UI线程处理。

我们编写了大量的回调,并且有很多重复的代码。接下来我们使用**响应式(Reactor)**进行重构:

userService.getFavorites(userId) // 1 
           .flatMap(favoriteService::getDetails)  // 2
           .switchIfEmpty(suggestionService.getSuggestions()) // 3
           .take(5// 4
           .publishOn(UiUtils.uiThreadScheduler()) // 5
           .subscribe(uiList::show, UiUtils::errorPopup); // 6
  1. 获取收藏夹IDs的流。
  2. 通过flatMap异步的将它们转化为Favorite对象,现在我们有了关于Favorite对象的流。
  3. 如果Favorite对象的流是空的,我们切换到suggestionService。
  4. 我们只取结果流中的5个元素。
  5. 切换到UI线程。
  6. 最终将数据流展示在UI列表,以及出现了错误时如何显示(弹出框提示)。

如果想确保在800毫秒内检索收藏夹IDs,如果需要更长的时间,那么从缓存中获取它们,该怎么办?在基于回调的代码中,这是一项复杂的任务。在响应式编程中,它就像在链中添加一个超时操作符一样简单,如下所示:

userService.getFavorites(userId)
           .timeout(Duration.ofMillis(800)) // 1 
           .onErrorResume(cacheService.cachedFavoritesFor(userId))  // 2
           .flatMap(favoriteService::getDetails) 
           .switchIfEmpty(suggestionService.getSuggestions())
           .take(5)
           .publishOn(UiUtils.uiThreadScheduler())
           .subscribe(uiList::show, UiUtils::errorPopup);
  1. 如果以上部分超过800ms没有发出任何信息,则传播一个错误。
  2. 如果出现错误,则回退到cacheService。

Future对象要比callbacks好一些,但是Future在组合(composition)上任然比较困难。尽管Java 8通过CompletableFuture进行了改进。编排多个Future对象是可行的,但是这并不容易。并且Future还有其他问题:

  • 调用get()方法很容易导致Future对象出现另一种阻塞情况。
  • 不支持惰性计算。
  • 缺乏对多值和高级错误处理的支持。

响应式库,如Reactor,Rxjava旨在解决JVM上”经典”异步方法的这些缺点,同时关注一些额外的方面:

  • 可组合性( Composability )和可读性( readability)
  • 数据作为一个流( flow ),使用丰富的操作符( operators)进行操作
  • 在你订阅( subscribe)之前什么都不会发生
  • 背压能力( Backpressure )或消费者向生产者发出排放速度过高信号的能力
  • High level but high value abstraction that is concurrency-agnostic

响应式编程不是银弹

通过上述的分析,使用异步架构可以减少线程上下文的切换和对CPU缓存更有效的使用,我们认为效率将会有一个数量级的提高,但是事实并非如此。

根据Netfix的实践,对于CPU密集型的应用使用异步架构并不会有很大的提升,对于I/O密集型应用使用异步架构降低了网络连接的成本,比阻塞式系统有很好的弹性优势,将会有较大的性能提升,大约25%左右。

异步架构的优点看起来很棒,但是上面的优点是以牺牲操作为代价的。阻塞式系统(命令式编程)很容易理解和调试。

在阻塞式线程总是在执行单个任务,因此线程的堆栈很容易捕获。相比之下异步架构(响应式编程)是基于回调和事件驱动的(event loop),event loop的堆栈是没有意义的。在处理事件和回调时很难跟踪请求,并且在这方面非常缺乏调试工具。边缘情况(Edge cases)、未处理的异常和未正确处理的状态更改会导致资源悬空( lost response),从而导致ByteBuf泄漏、文件描述符泄漏、响应丢失等。事实证明,这些类型的问题很难调试,因为很难知道哪个事件没有正确处理或没有正确清除。

许多库依赖于Thread Local来构建和存储关于请求的上下文。但是Thread Local不能在异步非阻塞环境中工作,因为在同一个线程上处理多个请求。

要将阻塞网络逻辑转化为非阻塞的网络代码,并不是一件容易工作,没有一刀切的策略可以将阻塞的网络逻辑转换为异步。

除了使用响应式编程之外,你还可以考虑Go语言的协程。

参考资料

Zuul2:The Netflix Journey to Asynchronous, Non-Blocking Systems

https://projectreactor.io/docs/core/release/reference/


以上是关于异步架构和响应式(Reactive)编程的主要内容,如果未能解决你的问题,请参考以下文章

浅析Java响应式编程(Reactive Programming)

Reactive 响应式流与制奶厂业务

Java9响应式编程Reactive Stream

响应式系统reactive system初探

Reactive stream 响应式流——Webflux响应式编程利器

当响应式编程遇上springboot