Spring Cloud Stream @SendTo 注解不起作用

Posted

技术标签:

【中文标题】Spring Cloud Stream @SendTo 注解不起作用【英文标题】:Spring Cloud Stream @SendTo Annotation not working 【发布时间】:2019-06-04 18:19:34 【问题描述】:

我将 Spring Cloud Stream 与 Spring Boot 一起使用。我的申请很简单:

ExampleService.class:

@EnableBinding(Processor1.class)
@Service
public class ExampleService 

    @StreamListener(Processor1.INPUT)
    @SendTo(Processor1.OUTPUT)
    public String dequeue(String message)
        System.out.println("New message: " + message);
        return message;
    

    @SendTo(Processor1.OUTPUT)
    public String queue(String message)
        return message;
    

处理器1.class:

public interface Processor1 

    String INPUT = "input1";
    String OUTPUT = "output1";

    @Input(Processor1.INPUT)
    SubscribableChannel input1();

    @Output(Processor1.OUTPUT)
    MessageChannel output1();

application.properties:

spring.cloud.stream.bindings.input1.destination=test_input
spring.cloud.stream.bindings.input1.group=test_group
spring.cloud.stream.bindings.input1.binder=binder1

spring.cloud.stream.bindings.output1.destination=test_output
spring.cloud.stream.bindings.output1.binder=binder1

spring.cloud.stream.binders.binder1.type=rabbit

spring.cloud.stream.binders.binder1.environment.spring.rabbitmq.host=localhost

场景:

1) 当我在“test_input.test_group”队列中推送消息时,消息被正确打印并正确发送到“test_output”交换。所以 ExampleService::dequeue 效果很好。

2) 当我调用 ExampleService::queue 方法时(从类外部,在测试中),消息永远不会发送到“test_output”交换。

我正在使用 Spring Boot 2.0.6.RELEASE 和 Spring Cloud Stream 2.0.2.RELEASE。

任何人都知道为什么方案 2) 不起作用?提前致谢。

【问题讨论】:

我也有同样的问题 【参考方案1】:

是什么让您相信 @SendTo 本身是受支持的? @SendTo 是很多项目使用的二级注解,不仅仅是Spring Cloud Stream;据我所知,没有任何东西会自行寻找它。

尝试使用 Spring Integration 的 @Publisher 注释(使用 @EnablePublisher)。

编辑

要强制使用 CGLIB 而不是 JDK 代理进行代理,您可以这样做...

@Bean
public static BeanFactoryPostProcessor bfpp() 
    return bf -> 
        bf.getBean(IntegrationContextUtils.PUBLISHER_ANNOTATION_POSTPROCESSOR_NAME,
                PublisherAnnotationBeanPostProcessor.class).setProxyTargetClass(true);
    ;

【讨论】:

使用 Spring Cloud Stream 注释这样做很有意义,为什么只涵盖 1)从流中侦听消息和 2)将消息从一个流重定向到另一个流而不是在流中创建消息的情况? 也许吧,但为什么要复制现有的能力;毕竟,基于 MessageChannel 的 spring-cloud-stream 应用程序是基于 Spring 集成基础的。大多数 Spring 项目都建立在一些现有的基础上。同样,如果有一些文档让您得出它应该可以工作的结论,请打开一个 GitHub 问题以修复文档或提出一个可靠的案例来支持它。 仅供参考,使用 Publisher 很麻烦,因为它不支持 StreamListener (***.com/questions/54150939/…) 之类的代理 我解决了这个问题以避免早期的 bean 初始化。

以上是关于Spring Cloud Stream @SendTo 注解不起作用的主要内容,如果未能解决你的问题,请参考以下文章

spring cloud-stream 和 spring cloud-bus 有啥区别?

Spring Cloud(12)——基于Kafka的Stream实现

Spring Cloud 2020.0.0 中的 Spring Cloud Bus/Stream 问题

spring cloud 2.x版本 Spring Cloud Stream消息驱动组件基础教程(kafaka篇)

使用 spring-boot:1.5.1 和 spring-cloud-stream 时无法启动 bean 'inputBindingLifecycle'

spring-cloud-stream 整合 rabbitmq