Spring Integration 反应式流支持
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spring Integration 反应式流支持相关的知识,希望对你有一定的参考价值。
Spring 集成在框架的某些位置和不同方面为响应式流交互提供支持。 我们将在这里讨论其中的大部分,并在必要时提供指向目标章节的适当链接以获取详细信息。
前言
回顾一下,Spring 集成扩展了 Spring 编程模型以支持众所周知的企业集成模式。 Spring 集成支持基于 Spring 的应用程序内的轻量级消息传递,并支持通过声明式适配器与外部系统集成。 Spring Integration的主要目标是为构建企业集成解决方案提供一个简单的模型,同时保持关注点的分离,这对于生成可维护,可测试的代码至关重要。 这个目标是在目标应用程序中使用一等公民(如 和)实现的,这允许我们构建集成流(管道),其中(在大多数情况下)一个端点将消息生成到通道中,供另一个端点使用。 通过这种方式,我们将集成交互模型与目标业务逻辑区分开来。 这里的关键部分是介于两者之间的通道:流行为取决于其实现,使端点保持不变。message
channel
endpoint
另一方面,反应流是具有非阻塞背压的异步流处理的标准。 Reactive Streams 的主要目标是管理跨异步边界的流数据交换 - 例如将元素传递到另一个线程或线程池 - 同时确保接收方不会被迫缓冲任意数量的数据。 换句话说,背压是此模型的一个组成部分,以便允许在线程之间进行调解的队列是有界的。 Reactive Streams 实现(如 Project Reactor)的目的是在流应用程序的整个处理图中保留这些优势和特征。 Reactive Streams 库的最终目标是以透明和流畅的方式为目标应用程序提供类型、运算符集和支持 API,尽可能使用可用的编程语言结构,但最终解决方案并不像普通函数链调用那样势在必行。 它分为几个阶段:定义和执行,这发生在订阅最终反应式发布者期间的一段时间,对数据的需求从定义的底部推到顶部,根据需要施加背压 - 我们请求尽可能多的事件我们目前可以处理。 响应式应用程序看起来像我们在 Spring 集成术语中习惯的 or - 。 事实上,自Java 9以来的Reactive Streams SPI在类中呈现。"stream"
"flow"
java.util.concurrent.Flow
从这里看,当我们在端点上应用一些反应式框架运算符时,Spring Integration flow确实非常适合编写反应式流应用程序,但实际上问题要广泛得多,我们需要记住,并非所有端点(例如)都可以透明地在反应式流中处理。 当然,Spring Integration 中响应式流支持的主要目标是允许整个过程完全响应、按需启动和背压就绪。 在通道适配器的目标协议和系统提供反应流交互模型之前,这是不可能的。 在下面的部分中,我们将描述Spring Integration中提供了哪些组件和方法,用于开发响应式应用程序保留集成流结构。JdbcMessageHandler
Spring 集成中的所有响应式流交互都使用项目反应器类型实现,例如 和 。 |
消息网关
与反应式流最简单的交互点是,我们只需将网关方法的返回类型作为 - 并且网关方法调用背后的整个集成流将在返回的实例上发生订阅时执行。 有关更多信息,请参见单声道反应器。 框架内部对完全基于反应流兼容协议的入站网关使用类似的 -reply 方法(有关更多信息,请参阅下面的反应式通道适配器)。 发送和接收操作包装到 a 中,只要可用,就会从标头链接回复评估。 这样,特定响应式协议(例如 Netty)的入站组件将作为在 Spring 集成上执行的响应式流的订阅者和发起者。 如果请求有效负载是反应式类型,则最好使用将进程延迟到发起方订阅的反应式流定义来处理它。 为此,处理程序方法还必须返回反应式类型。 有关详细信息,请参阅下一节。@MessagingGateway
Mono<?>
Mono
Mono
Mono.deffer()
replyChannel
反应式回复有效负载
当生成回复消息的响应式有效负载时,它以异步方式处理,并为 提供常规实现,并在输出通道是实现时与按需订阅平展,例如。 对于标准的命令式用例,如果回复有效负载是多值发布者(有关详细信息,请参阅),则将其包装到 . 因此,必须明确订阅下游或由下游扁平化。 使用 for ,无需担心返回类型和订阅;一切都由框架在内部顺利处理。MessageHandler
MessageChannel
outputChannel
ReactiveStreamsSubscribableChannel
FluxMessageChannel
MessageChannel
ReactiveAdapter.isMultiValue()
Mono.just()
Mono
FluxMessageChannel
ReactiveStreamsSubscribableChannel
outputChannel
有关详细信息,请参阅异步服务激活器。
另请参阅 Kotlin 协程 了解更多信息。
FluxMessageChannel
和ReactiveStreamsConsumer
是 和 的组合实现。 A 作为热源,在内部创建,用于接收来自实现的传入消息。 实施委派给该内部 . 此外,对于按需上游消费,提供了合约的实现。 当订阅准备好为此通道时,为此通道提供的任何上游(例如,请参阅下面的源轮询通道适配器和拆分器)都会自动订阅。 来自此委派发布者的事件被沉入上述内部事件。FluxMessageChannel
MessageChannel
Publisher<Message<?>>
Flux
send()
Publisher.subscribe()
Flux
FluxMessageChannel
ReactiveStreamsSubscribableChannel
Publisher
Flux
的使用者必须是履行反应流合约的实例。 幸运的是,Spring Integration中的所有实现也实现了来自项目Reactor。 由于中间的实现,整个集成流配置对目标开发人员是透明的。 在这种情况行为从命令式推送模型更改为反应式拉取模型。 A 也可用于使用 将任何源转换为响应式源,使集成流部分响应。FluxMessageChannel
org.reactivestreams.Subscriber
MessageHandler
CoreSubscriber
ReactiveStreamsConsumer
ReactiveStreamsConsumer
MessageChannel
IntegrationReactiveUtils
有关详细信息,请参阅 FluxMessageChannel。
从版本 5.5 开始,引入了一个选项,用于使流中的终结点独立于输入通道。 可选配,通过操作从输入通道自定义源,例如使用 、 等。 此功能通过其属性表示为所有消息传递注释(等)的子注释。ConsumerEndpointSpec
reactive()
ReactiveStreamsConsumer
Function<? super Flux<Message<?>>, ? extends Publisher<Message<?>>>
Flux
Flux.transform()
publishOn()
doOnNext()
retry()
@Reactive
@ServiceActivator
@Splitter
reactive()
源轮询通道适配器
通常,依赖于由 启动的任务。 轮询触发器是从提供的选项构建的,用于定期计划任务以轮询数据或事件的目标源。 当 a 为 时,同样用于确定下一次执行的时间,但不是计划任务,而是基于 for 的值和上一步的持续时间创建 。 然后使用 A 轮询并将它们沉入输出中。 该发生器由提供的背压下游订阅。 从版本 5.5 开始,当 时,根本不调用源,而是立即通过结果完成,直到稍后更改为非零值,例如通过控制总线。 这样,任何实现都可以变成反应式热源。SourcePollingChannelAdapter
TaskScheduler
outputChannel
ReactiveStreamsSubscribableChannel
Trigger
SourcePollingChannelAdapter
Flux<Message<?>>
Flux.generate()
nextExecutionTime
Mono.delay()
Flux.flatMapMany()
maxMessagesPerPoll
Flux
Flux
ReactiveStreamsSubscribableChannel
maxMessagesPerPoll == 0
flatMapMany()
Mono.empty()
maxMessagesPerPoll
MessageSource
有关详细信息,请参阅轮询使用者。
事件驱动通道适配器
MessageProducerSupport
是事件驱动的通道适配器的基类,通常,它用作生成驱动程序 API 中的侦听器回调。 当消息生产者实现构建消息而不是基于侦听器的功能时,也可以轻松地将此回调插入 Reactor 运算符。 实际上,当消息生产者的 不是 . 但是,为了改善最终用户体验,并允许更多背压就绪功能,当 a 是目标系统的数据源时,提供了要在目标实现中使用的 API。 通常,当为源数据调用目标驱动程序 API 时,从实现中使用它。 建议将反应式实现与 作为按需订阅和下游事件消费的 。 取消订阅时,通道适配器将进入停止状态。 调用此类通道适配器即可完成从源 . 频道适配器可以通过自动订阅新创建的源来重新启动。sendMessage(Message<?>)
doOnNext()
Flux
outputChannel
ReactiveStreamsSubscribableChannel
MessageProducerSupport
subscribeToPublisher(Publisher<? extends Message<?>>)
Publisher<Message<?>>>
doStart()
Publisher
MessageProducerSupport
FluxMessageChannel
outputChannel
Publisher
stop()
Publisher
Publisher
到反应流的消息源
从版本 5.3 开始,提供了 a。 它是将提供的和事件驱动的生产组合到配置的 . 在内部,它将 a 包装到重复重新订阅的生成 a 中以上述方式订阅。 对此的订阅是为了避免目标中可能的阻塞而完成的。 当消息源返回(没有要拉取的数据)时,将转换为带有 的状态,以便根据订阅者上下文中的条目进行后续重新订阅。 默认情况下,它是 1 秒。 如果 生成的消息在标头中包含信息,则会在原始消息中确认(如有必要),并在 如果下游流抛出失败的消息以拒绝。 当轮询通道适配器的功能应转换为任何现有实现的响应式按需解决方案时,这可以用于任何用例。ReactiveMessageSourceProducer
MessageSource
outputChannel
MessageSource
Mono
Flux<Message<?>>
subscribeToPublisher(Publisher<? extends Message<?>>)
Mono
Schedulers.boundedElastic()
MessageSource
null
Mono
repeatWhenEmpty()
delay
IntegrationReactiveUtils.DELAY_WHEN_EMPTY_KEY
Duration
MessageSource
IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK
doOnSuccess()
Mono
doOnError()
MessagingException
ReactiveMessageSourceProducer
MessageSource<?>
拆分器和聚合器
当 a 为其逻辑获取 a 时,该过程自然会遍历 中的项目,将它们映射到消息中以发送到 . 如果这个通道是一个 ,则从该通道按需订阅的包装器,当我们将传入事件映射到多值输出时,此拆分器行为看起来更像是 Reactor 运算符。 当整个集成流是使用拆分器之前和之后构建的,使 Spring 集成配置与反应式流需求及其事件处理运算符保持一致时,这是最有意义的。 对于常规通道,a 被转换为标准迭代和生成拆分逻辑。AbstractMessageSplitter
Publisher
Publisher
outputChannel
ReactiveStreamsSubscribableChannel
Flux
Publisher
flatMap
Publisher
FluxMessageChannel
Publisher
Iterable
A 是特定 Reactive Streams 逻辑实现的另一个示例,可以将其视为 Project Reactor。 它基于 和(或)运算符。 传入的消息在创建 时沉入启动,使其成为热源。 这是按需订阅的,或者直接在 时 不是反应性的。 当整个集成流程构建之前和之后的组件时,这有其强大功能,使整个逻辑背压准备就绪。FluxAggregatorMessageHandler
"reactive operator"
Flux.groupBy()
Flux.window()
buffer()
Flux.create()
FluxAggregatorMessageHandler
Flux
ReactiveStreamsSubscribableChannel
FluxAggregatorMessageHandler.start()
outputChannel
MessageHandler
FluxMessageChannel
有关详细信息,请参阅流和通量拆分和通量聚合器。
Java DSL
Java 中的 DSL 可以从任何实例启动(请参见)。 此外,通过操作员,可以变成反应性热源。 在这两种情况下,A 在内部使用;它可以根据其合约订阅入站,并且它本身就是下游订阅者的。 通过动态注册,我们可以实现一个强大的逻辑,将反应式流与这种与 .IntegrationFlow
Publisher
IntegrationFlow.from(Publisher<Message<T>>)
IntegrationFlowBuilder.toReactivePublisher()
IntegrationFlow
FluxMessageChannel
Publisher
ReactiveStreamsSubscribableChannel
Publisher<Message<?>>
IntegrationFlow
Publisher
从版本 5.5.6 开始,存在一个运算符变体来控制返回的整个生命周期。 通常,来自反应式发布者的订阅和使用发生在后期运行时阶段,而不是在反应式流组合甚至启动期间。 为了避免在订阅点进行生命周期管理的样板代码,并提供更好的最终用户体验,引入了这个带有标志的新运算符。 它将 及其组件标记为 (if),因此 不会自动启动流中消息的生产和使用。 相反,对于 是从内部 . 与值无关,流从 和 停止 - 如果没有可以使用的消息,则生成消息是没有意义的。toReactivePublisher(boolean autoStartOnSubscribe)
IntegrationFlow
Publisher<Message<?>>
ApplicationContext
IntegrationFlow
Publisher<Message<?>>
autoStartOnSubscribe
true
IntegrationFlow
autoStartup = false
ApplicationContext
start()
IntegrationFlow
Flux.doOnSubscribe()
autoStartOnSubscribe
Flux.doOnCancel()
Flux.doOnTerminate()
对于完全相反的用例,何时应调用反应式流并在完成后继续,在 . 此时的流被转换成一个,传播到一个提供的,在操作器中执行。 该函数的结果被包装到 a 中,用于平面映射到一个输出中,该输出由另一个输出订阅用于下游流。IntegrationFlow
fluxTransform()
IntegrationFlowDefinition
FluxMessageChannel
fluxFunction
Flux.transform()
Mono<Message<?>>
Flux
FluxMessageChannel
有关更多信息,请参阅 Java DSL 章节。
ReactiveMessageHandler
从版本 5.3 开始,框架原生支持 。 这种类型的消息处理程序专为反应式客户端设计,这些客户端为按需订阅返回反应式类型以进行低级操作执行,并且不提供任何回复数据来继续反应式流组合。 当在命令式集成流中使用 a 时,结果在订阅后立即返回,只是因为在这样的流中没有响应式流组合来遵循背压。 在这种情况下,框架将其包装成 - 一个普通的实现。 然而,当a参与流时(例如,当消耗的通道是a时),这样的a由反应器操作器组成整个反应流,以在消耗期间遵守背压。ReactiveMessageHandler
ReactiveMessageHandler
handleMessage()
ReactiveMessageHandler
ReactiveMessageHandlerAdapter
MessageHandler
ReactiveStreamsConsumer
FluxMessageChannel
ReactiveMessageHandler
flatMap()
现成的实现之一是用于出站通道适配器的实现。 请参阅 MongoDB 反应式通道适配器 了解更多信息。ReactiveMessageHandler
ReactiveMongoDbStoringMessageHandler
反应通道适配器
当集成的目标协议提供反应式流解决方案时,在 Spring 集成中实现通道适配器变得很简单。
入站、事件驱动的通道适配器实现是关于将请求(如有必要)包装到延迟或中,并且仅当协议组件将订阅启动到从侦听器方法返回时执行发送(并生成回复,如果有)。 这样,我们就有一个反应式流解决方案完全封装在这个组件中。 当然,在输出通道上订阅的下游集成流应遵循反应流规范,并以按需、背压就绪的方式执行。Mono
Flux
Mono
这并不总是根据集成流中使用的处理器的性质(或当前实现)提供。 可以使用线程池和队列来处理此限制,或者(见上文)在没有反应式实现时在集成终端节点之前和之后处理此限制。MessageHandler
FluxMessageChannel
反应式事件驱动的入站通道适配器的示例:
public class CustomReactiveMessageProducer extends MessageProducerSupport
private final CustomReactiveSource customReactiveSource;
public CustomReactiveMessageProducer(CustomReactiveSource customReactiveSource)
Assert.notNull(customReactiveSource, "customReactiveSource must not be null");
this.customReactiveSource = customReactiveSource;
@Override
protected void doStart()
Flux<Message<?>> messageFlux =
this.customReactiveSource
.map(event - >
MessageBuilder
.withPayload(event.getBody())
.setHeader(MyReactiveHeaders.SOURCE_NAME, event.getSourceName())
.build());
subscribeToPublisher(messageFlux);
用法如下所示:
public class MainFlow
@Autowired
private CustomReactiveMessageProducer customReactiveMessageProducer;
@Bean
public IntegrationFlow buildFlow()
return IntegrationFlow.from(customReactiveMessageProducer)
.channel(outputChannel)
.get();
或者以声明性方式:
public class MainFlow
@Bean
public IntegrationFlow buildFlow()
return IntegrationFlow.from(new CustomReactiveMessageProducer(new CustomReactiveSource()))
.handle(outputChannel)
.get();
或者即使没有通道适配器,我们始终可以通过以下方式使用 Java DSL:
public class MainFlow
@Bean
public IntegrationFlow buildFlow()
Flux<Message<?>> myFlux = this.customReactiveSource
.map(event ->
MessageBuilder
.withPayload(event.getBody())
.setHeader(MyReactiveHeaders.SOURCE_NAME, event.getSourceName())
.build());
return IntegrationFlow.from(myFlux)
.handle(outputChannel)
.get();
反应式出站通道适配器实现是关于根据为目标协议提供的反应式 API 启动(或延续)反应式流以与外部系统交互。 入站有效负载本身可以是反应式类型,也可以是整个集成流的事件,它是顶部反应式流的一部分。 如果我们处于单向、即发即弃的场景中,则可以立即订阅返回的反应式类型,或者将其传播到下游(请求-回复方案)以进一步集成流或在目标业务逻辑中进行显式订阅,但仍在下游保留反应式流语义。
反应式出站通道适配器的示例:
public class CustomReactiveMessageHandler extends AbstractReactiveMessageHandler
private final CustomEntityOperations customEntityOperations;
public CustomReactiveMessageHandler(CustomEntityOperations customEntityOperations)
Assert.notNull(customEntityOperations, "customEntityOperations must not be null");
this.customEntityOperations = customEntityOperations;
@Override
protected Mono<Void> handleMessageInternal(Message<?> message)
return Mono.fromSupplier(() -> message.getHeaders().get("queryType", Type.class))
.flatMap(mode ->
switch (mode)
case INSERT:
return handleInsert(message);
case UPDATE:
return handleUpdate(message);
default:
return Mono.error(new IllegalArgumentException());
).then();
private Mono<Void> handleInsert(Message<?> message)
return this.customEntityOperations.insert(message.getPayload())
.then();
private Mono<Void> handleUpdate(Message<?> message)
return this.r2dbcEntityOperations.update(message.getPayload())
.then();
public enum Type
INSERT,
UPDATE,
我们将能够使用两个通道适配器:
public class MainFlow
@Autowired
private CustomReactiveMessageProducer customReactiveMessageProducer;
@Autowired
private CustomReactiveMessageHandler customReactiveMessageHandler;
@Bean
public IntegrationFlow buildFlow()
return IntegrationFlow.from(customReactiveMessageProducer)
.transform(someOperation)
.handle(customReactiveMessageHandler)
.get();
目前,Spring Integration为WebFlux,RSocket,MongoDb,R2DBC,ZeroMQ,GraphQL,Apache Cassandra提供了通道适配器(或网关)实现。 Redis 流通道适配器也是反应式的,并从 Spring Data 使用。 更多的反应式通道适配器即将到来,例如Kafka中的Apache Kafka基于Apache Kafka的Apache Kafka和来自Spring的Apache Kafka等。 对于许多其他非反应式通道适配器,建议使用线程池以避免在反应式流处理期间阻塞。ReactiveStreamOperations
ReactiveKafkaProducerTemplate
ReactiveKafkaConsumerTemplate
以上是关于Spring Integration 反应式流支持的主要内容,如果未能解决你的问题,请参考以下文章
Spring Integration 5.1 - 使用 @IntegrationConverter 的集成流转换不起作用