Spring Web-Flux 中的背压机制
Posted
技术标签:
【中文标题】Spring Web-Flux 中的背压机制【英文标题】:Backpressure mechanism in Spring Web-Flux 【发布时间】:2019-02-14 02:55:29 【问题描述】:我是 Spring Web-Flux 的初学者。我写了一个控制器如下:
@RestController
public class FirstController
@GetMapping("/first")
public Mono<String> getAllTweets()
return Mono.just("I am First Mono")
我知道反应性好处之一是背压,它可以平衡请求或响应率。我想了解如何在 Spring Web-Flux 中实现背压机制。
【问题讨论】:
如何拥有背压或如何处理呢?你已经有内置的机制来处理它,所以你想了解它们吗? 如果有,请查看this page 我不知道我已经默认拥有它,所以也许如何处理它更好。我想了解Spring Web-Flux中的机制及其配置。 @Andrew Tobilko RxJava 与 Spring WebFlux 底层的 Reactor 不同 【参考方案1】:WebFlux 中的背压
为了理解 Backpressure 在当前 WebFlux 框架的实现中是如何工作的,我们必须回顾一下这里默认使用的传输层。我们可能还记得,浏览器和服务器之间的正常通信(服务器到服务器的通信通常也一样)是通过 TCP 连接完成的。 WebFlux 也使用该传输在客户端和服务器之间进行通信。 然后,为了了解 backpressure control 术语的含义,我们必须从 Reactive Streams 规范的角度回顾一下 backpressure 的含义。
基本语义定义了流元素的传输如何通过背压进行调节。
因此,从该陈述中,我们可以得出结论,在 Reactive Streams 中,背压是一种通过传输(通知)接收者可以消耗多少元素来调节需求的机制;这里有一个棘手的问题。 TCP 具有字节抽象而不是逻辑元素抽象。我们通常所说的背压控制想要的是控制发送/接收到/从网络的逻辑元素的数量。尽管 TCP 有自己的流量控制(参见含义here 和动画there),但这种流量控制仍然是针对字节而不是针对逻辑元素。
在WebFlux模块目前的实现中,背压是由传输流控制来调节的,但它并没有暴露接收者的真实需求。为了最终看到交互流程,请看下图:
为简单起见,上图显示了两个微服务之间的通信,其中左侧发送数据流,右侧使用该流。以下编号列表提供了该图的简要说明:
-
这是一个 WebFlux 框架,它负责将逻辑元素转换为字节并返回,并将它们传输到 TCP(网络)/从 TCP(网络)接收它们。
这是元素的长时间运行处理的开始,一旦作业完成,它将请求下一个元素。
在这里,虽然业务逻辑没有要求,但 WebFlux 会在未确认的情况下将来自网络的字节排入队列(业务逻辑没有要求)。
由于 TCP 流控制的性质,服务 A 仍可能向网络发送数据。
从上图中我们可以注意到,接收者暴露的需求与发送者的需求不同(这里的需求是逻辑元素)。这意味着两者的需求是隔离的,仅适用于 WebFlux 业务逻辑(服务)交互,对 Service A Service B 交互的背压暴露较少。这一切都意味着 WebFlux 中的背压控制并不像我们预期的那样公平。
这意味着 WebFlux 中的背压控制并不像我们预期的那样公平。
但我还是想知道如何控制背压
如果我们仍然想对 WebFlux 中的背压进行不公平的控制,我们可以在 Project Reactor 操作员的支持下做到这一点,例如 limitRate()
。以下示例显示了我们如何使用该运算符:
@PostMapping("/tweets")
public Mono<Void> postAllTweets(Flux<Tweet> tweetsFlux)
return tweetService.process(tweetsFlux.limitRate(10))
.then();
从示例中可以看出,limitRate()
运算符允许定义一次预取的元素数量。这意味着即使最终订阅者请求 Long.MAX_VALUE
元素,limitRate
运算符也会将该需求拆分为多个块,并且不允许一次消耗更多。我们可以对元素发送过程做同样的事情:
@GetMapping("/tweets")
public Flux<Tweet> getAllTweets()
return tweetService.retreiveAll()
.limitRate(10);
上面的例子表明,即使 WebFlux 一次请求超过 10 个元素,limitRate()
也会将需求限制为预取大小,并防止一次消耗超过指定数量的元素。
另一种选择是实现自己的 Subscriber
或从 Project Reactor 扩展 BaseSubscriber
。例如,以下是我们如何做到这一点的简单示例:
class MyCustomBackpressureSubscriber<T> extends BaseSubscriber<T>
int consumed;
final int limit = 5;
@Override
protected void hookOnSubscribe(Subscription subscription)
request(limit);
@Override
protected void hookOnNext(T value)
// do business logic there
consumed++;
if (consumed == limit)
consumed = 0;
request(limit);
RSocket 协议的公平背压
为了通过网络边界实现逻辑元素背压,我们需要一个合适的协议。幸运的是,有一个叫RScoket protocol。 RSocket 是一种应用层协议,允许通过网络边界传输实际需求。 该协议有一个 RSocket-Java 实现,允许设置 RSocket 服务器。在服务器到服务器通信的情况下,相同的 RSocket-Java 库也提供客户端实现。要了解更多如何使用 RSocket-Java,请参阅以下示例 here。 对于浏览器-服务器通信,有一个RSocket-JS 实现,它允许通过WebSocket 连接浏览器和服务器之间的流式通信。
基于 RSocket 的已知框架
现在有一些框架是建立在 RSocket 协议之上的。
变形杆菌
其中一个框架是 Proteus 项目,它提供了基于 RSocket 构建的成熟微服务。此外,Proteus 与 Spring 框架很好地集成,所以现在我们可以实现公平的背压控制(参见示例there)
进一步阅读
https://www.netifi.com/proteus https://medium.com/netifi http://scalecube.io/【讨论】:
尽管答案非常好,但我认为 OP 正在询问处理背压的程序化(更高级、与 API 相关的)方法。 让我添加它。一会儿 @OlehDokuka 感谢您的精彩解释。你能告诉我这是扩展BaseSubscriber
的常用方法还是Spring Web-Flux 的默认行为更好?
最好使用框架控件。但是,您可以使用limitRate 进行一些限制,并且仅在需要细粒度控制的情况下,您可以扩展BaseSubscriber
。在许多情况下,这取决于您真正想要实现的目标。我建议您在 Reactor gitter 聊天室进行对话并在那里进行正常聊天 -> gitter.im/reactor/reactor
这是一个元素的长时间运行处理的开始,一旦作业完成,它会请求下一个元素。您能改写一下吗?你说的是什么工作?以上是关于Spring Web-Flux 中的背压机制的主要内容,如果未能解决你的问题,请参考以下文章
通过 BufferBlock 的背压不起作用。 (C# TPL 数据流)