具有反应流的 Spring 云流

Posted

技术标签:

【中文标题】具有反应流的 Spring 云流【英文标题】:Spring cloud stream with reactive streams 【发布时间】:2016-11-12 15:44:00 【问题描述】:

我的目标是使用 Spring Cloud Stream 和 Kafka 制作应用程序,并发现“反应式”世界。 我有一些有用的东西。这是我的消费者的一部分。 在我的 pom 中,我已经声明:

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-kafka</artifactId>
        </dependency>

我使用的是 1.0.0.RELEASE。 我已经声明了我的频道

public interface MyChannels 

    public static final String TOPIC_NAME = "myTopicName";

    @Input(TOPIC_NAME)
    MessageChannel receive();

然后是我的服务

@MessageEndpoint
@EnableBinding(MyChannels.class)
public class MyConsumer 

    @Autowired
    private MyChannels channels;

    @ServiceActivator(inputChannel=MyChannels.TOPIC_NAME)
    public void receive(MyObject object) 
        //apply my business logic
        //like save my object in a database
    

我很好地收到了我的信息。 我在依赖项中看到 spring-integration-kafka 依赖于 reactor-core。 让我的应用程序“反应”就足够了吗? 我应该怎么做才能应用反应式编程风格?

我是否必须使用@EnableRxJavaProcessor,如果是,我不明白如何。

如果我不清楚,请不要犹豫,在评论中写出来。谢谢

【问题讨论】:

【参考方案1】:

Reactor 的使用是项目内部的,不会使您的应用程序具有响应性(还不是 :))。

这里的参考文档涵盖了 RxJava 支持的使用:http://docs.spring.io/spring-cloud-stream/docs/current/reference/htmlsingle/#_rxjava_support

我们确实打算在 Spring Cloud Stream 1.1 中提供更广泛的支持:https://github.com/spring-cloud/spring-cloud-stream/issues/458

干杯, 马吕斯

【讨论】:

感谢 Marius,我希望您或您的团队成员看到我的消息 :) 从昨天开始,我已经阅读了 github 问题两到三遍。我很难理解 EnableRxJavaProcessor 将如何使用我的消息。是否有更多文档或可以在任何地方为我提供更多帮助的东西? 您好,只有当您想让您的应用程序本身成为基于 RxJava 的消息处理程序时,您才需要 @EnableRxJavaProcessor。此示例可以帮助您更好地理解:github.com/spring-cloud/spring-cloud-stream-samples/tree/master/….

以上是关于具有反应流的 Spring 云流的主要内容,如果未能解决你的问题,请参考以下文章

如何使用spring webflux在功能性反应java中编写具有多个if else的复杂代码

Spring boot 反应式和 EventSource 的工作示例

Spring 反应式安全 |如何实现反应式 PermissionEvaluator

使用Kafka向Dlq Spring云流发送消息时出错

Spring 5 反应式异常处理

Spring Boot 2 反应式 webflux