Spring Cloud Stream 消息从/到 JSON 转换配置

Posted

技术标签:

【中文标题】Spring Cloud Stream 消息从/到 JSON 转换配置【英文标题】:Spring Cloud Stream message from/to JSON conversion configuration 【发布时间】:2016-05-31 18:11:19 【问题描述】:

我正在使用带有 RabbitMQ 绑定器的 Spring Cloud Stream。它适用于 byte[] 有效负载和 Java 本机序列化,但我需要使用 JSON 有效负载。

这是我的处理器类。

@EnableBinding(Processor.class)
public class MessageProcessor 
    @ServiceActivator(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
    public OutputDto handleIncomingMessage(InputDto inputDto) 
        // Run some job.
        return new OutputDto();
    

InputDtoOutputDto 是带有 Jackson 注释的 POJO。

如何配置 JSON 转换策略? 邮件标头应该如何被接受和处理?

【问题讨论】:

【参考方案1】:

在您的消费者中,您可以添加内容类型配置,例如

spring.cloud.stream.bindings.input.content-type: application/x-java-object;type=my.package.InputDto

你也可以添加

spring.cloud.stream.bindings.output.content-type: application/json

强制传出消息负载为 JSON(用于互操作等)。

请注意,“输入”和“输出”是活页夹通道名称(即在您的应用中的 Processor 中定义)。

我认为这很有可能会变得更容易或更自动化,但是在 Spring Cloud 中实现这一点需要一些工程努力。 github有问题,如果你想关注它:https://github.com/spring-cloud/spring-cloud-stream/issues/156。

要手动向 Spring Cloud Stream 发送消息,您可以手动设置标头(但使用 Stream 更容易)。在 Rabbit 管理 UI 中,JSON 消息如下所示:

priority:   0
delivery_mode:  2
headers:    
    contentType:    text/plain
    originalContentType:    application/json
content_type:   text/plain

【讨论】:

可以配置NOT_NULL序列化吗? 您可以以任何您喜欢的方式注释您的 DTO。杰克逊会尊重你的意愿,我想。你是这个意思吗?它与原始问题相关吗? 这不是对象映射器而不是注释的问题吗?不推荐使用使用注释来过滤空值的 AFAIK,您应该通过映射器配置来执行此操作。因此,我认为这是问题的一部分。作为配置序列化策略的一部分,您应该能够以某种方式提供您的对象映射器实例。除非我弄错了。 我不知道反序列化器的注释已被弃用(你有链接吗?)。如果需要,您可以配置用于进行转换的ObjectMapper,但我认为这确实是一个单独的问题。 啊,你把我带到了那里。 @JsonSerialize 已被弃用,但有 @JsonInclude 代替。好的,我去用那个。您能否看一下我上面的第二个问题 - 我应该为input 配置的content-type: application/json 接受的消息设置哪些标头。到目前为止,我尝试了“content_type:application/json”,它在这里给了我空指针:MessageChannelBinderSupport.java:532

以上是关于Spring Cloud Stream 消息从/到 JSON 转换配置的主要内容,如果未能解决你的问题,请参考以下文章

Spring Cloud Stream 3.x - 重播消息策略

Spring Cloud 系列之 Stream 消息驱动

Spring cloud stream消息分区

Spring Cloud Stream 手动偏移管理

Spring Cloud 2020.0.0 中的 Spring Cloud Bus/Stream 问题

Spring Cloud Stream实现消息过滤的三种主要方式