在项目反应堆或 akka 流中,接收器和订阅者之间的概念区别是啥?

Posted

技术标签:

【中文标题】在项目反应堆或 akka 流中,接收器和订阅者之间的概念区别是啥?【英文标题】:In project reactor or akka streams, what is the conceptual difference between sink and subscriber?在项目反应堆或 akka 流中,接收器和订阅者之间的概念区别是什么? 【发布时间】:2018-06-18 03:39:28 【问题描述】:

接收器和订阅者的概念似乎与我相似。另外,我没有看到在反应流规范中明确定义接收器的概念。

【问题讨论】:

【参考方案1】:

我看到来自Project Reactor(此处缺少免责声明)、posted an answer 的 Oleh Dokuka,尽管它对 Akka StreamsReactive Streams 的许多假设都是不正确的,所以请允许我在下面澄清一下。

免责声明:我从早期就参与了 Reactive Streams,authored 大部分是 Technology Compatibility Kit。我还维护 Akka 和 Akka Streams。

另请注意:反应式流已包含在 Java 9 中,并被称为 java.util.concurrent.Flow.*,因此以下所有关于 RS 的 cmets 与 j.u.c.Flow.Subscriber 和其他类型的方式完全相同。


答案

Reactive Streams 是一种服务提供者接口 (SPI) 规范

反应式流,特别是发布者/订阅者/订阅/处理器类型,是Service Provider Interface。甚至在 earliest discussions 关于可追溯到 2014 年的规范中也证实了这一点。

在规范的早期,甚​​至规范的类型都试图隐藏发布者、订阅者和其他类型。遗憾的是,无论在后面考虑的 API thus the API(!) was removed and the SPI types are all that remained,这些类型都会泄漏。

如今,您看到一些 Reactive Streams 的实现声称它们对这些类型的直接扩展出于某种原因是一种好处。这是不正确的,因为不是,也不是 Reactive Streams 接口的目标。这是对这些类型是什么的误解——严格来说,Reactive Streams 库同意理解和“说话”(一种协议)的互操作接口。

作为参考,RxJava 2.0 和 Reactor 确实直接扩展了这些类型,而 Akka Streams 仍然忠实于 RS 的设计和原则,将它们隐藏为应用程序开发人员编程接口——这就是 Sink 不扩展订阅者的原因。这与“本地支持”无关,我看到人们声称直接的 IS-A 关系是(相反,声称互操作库是您的“本地”是对这个概念的误解)。

接收器和订阅者、来源和发布者

接收器和订阅者的概念似乎与我相似。

正确,它们是故意和设计相似的。

As a Sink 是一个lifted representation,它可以有效地产生一个订阅者。为简化起见,您可以将其视为“订阅者工厂”(更具体地说,Sink 是“蓝图”,Materializer 获取 sink 的蓝图并创建相应的 RS 阶段,包括 Sources 的 Publishers 和 Sinks 的订阅者。 因此,当您说 Sink.ignore 时,它​​实际上是一个工厂,它最终会创建一个订阅者来执行所有请求和忽略,如 Reactive Streams 所述。与 Sink 上声明的所有其他方法相同。

这同样适用于Source,它将 1:1 与反应式流 Publisher 相关联。因此,Source.single(1) 将在内部实现为 Publisher 来完成它的工作 - 如果下游允许这样做,则发出 1 个元素。

又名。为什么 Reactive Streams 中没有 Sink?

如上所述,Akka 的 Sink 并不直接扩展订阅者。然而,它基本上是他们的工厂。

您可能会问:“尽管在正常使用期间,用户是否根本看不到这些发布者/订阅者类型?”答案是:是的,这是一个特性,也是设计目标(根据 Reactive Streams 是什么)。如果底层的 Publisher 和 Subscriber 实例一直直接暴露给用户,人们可能会错误地调用它们,从而导致错误和混乱。如果除非明确要求,否则这些类型永远不会公开,那么意外错误的机会就会减少!

有些人误解了这种设计,并声称 Akka Streams 中没有“本机”支持(这是不正确的)。让我们看看在 API 中从订阅者中分离得到了什么:

另外,我没有看到在反应流规范中明确定义接收器的概念。

确实,Sink 不是 Reactive Streams 的一部分,这绝对没问题。

避免“Sink IS-A 订阅者”的好处

Sinks 是 Akka Streams 的一部分,它们的目的是提供流畅的 DSL,同时也是 Subscribers 的工厂。换句话说,如果订阅者是乐高积木,Sink 是构建它们的要素(而 Akka Stream Materializer 是将各种乐高积木组合在一起以“运行”它们的要素)。

事实上,Sink 不像其他库那样携带任何带有订阅者的明确 IS-A(原文如此!),这对用户是有益的:

这是因为org.reactivestreams.Subscriber 现在已包含在 Java 9 中,并已成为 Java 本身的一部分,因此库应迁移到使用 java.util.concurrent.Flow.Subscriber 而不是 org.reactivestreams.Subscriber。选择公开和直接扩展 Reactive Streams 类型的库现在将更难适应 JDK9 类型——它们所有扩展订阅者和朋友的类都需要复制或更改以扩展完全相同的接口,但是从不同的包。在 Akka 中,我们只是在被要求时公开新类型——从 JDK9 发布之日起就已经支持 JDK9 类型。

Reactive Streams 是一个 SPI——一个服务提供者接口——它旨在供库共享,以便它们可以“使用相同的类型和协议”。 Akka Streams 和其他 Reactive Streams 库所做的所有通信都遵守这些规则,如果你想将其他一些库连接到 Akka Streams,你会这样做——给 Akka Streams 提供互操作类型,它是订阅者、处理者或发布者;不是 Sink,因为那是 Akka 的“Akka 特定”DSL(域特定语言),它增加了便利性和其他细节,隐藏(故意!)订阅者类型。

Akka(老实说,其他 RS 实现也被鼓励这样做,但选择不这样做)隐藏这些类型的另一个原因是因为它们很容易做错事。 如果你传递了一个订阅者,任何人都可以在上面调用东西,甚至可以从与该类型交互的任何人那里调用un-knowingly break rules and guarantees that the Reactive Streams Specification requires。

为了避免发生错误,Akka Streams 中的 Reactive Streams 类型是“隐藏的”,并且仅在明确要求时才公开 - 最大限度地减少人们因意外调用“原始”Reactive Streams 类型的方法而不遵循而犯错的风险他们的协议。

【讨论】:

删除了我的答案。 ;) 哦,我不知道这是可能的 :) 感谢您通过 Twitter 与我们联系,以便我在这里澄清事情! @OlehDokuka 很棒的帖子!只是想确保我的理解是正确的:将我当前的 Akka Streams 应用程序移植到 JDK9 会完全无缝吗?有什么需要特别注意的吗? 它已经拥有所有内置的东西,如果你在 Java 9 上运行,你已经可以获得 jucFlow.* 类型:请参阅 JavaFlowSupport 中的接收器和源:github.com/akka/akka/pull/23650/… 我们只公开这个类当您在 Java 9 上运行时,感谢多版本 jar :-)

以上是关于在项目反应堆或 akka 流中,接收器和订阅者之间的概念区别是啥?的主要内容,如果未能解决你的问题,请参考以下文章

Akka之BackoffSupervisor

akka 流将 akka-htpp Web 请求调用集成到流中

在 scala/akka 的计算之间检查参与者的消息查询

Java并发,Akka和RxJava之间的区别?

Akka Streams:流中的状态

在 Kotlin 流中使用 ReactiveSecurityContextHolder