Spring Boot - RabbitMQ - 将消息内容类型从八位字节流更改为 json

Posted

技术标签:

【中文标题】Spring Boot - RabbitMQ - 将消息内容类型从八位字节流更改为 json【英文标题】:Spring Boot - RabbitMQ - Changing message content type from octet-stream to json 【发布时间】:2019-12-15 15:17:13 【问题描述】:

我在将 RabbitMQ 中收到的 Byte[] 消息转换为基于模型的某个对象时遇到问题。

我正在使用 Spring Boot。

在我已经实现RabbitListenerConfigurer的应用程序中:

@SpringBootApplication
class Application : RabbitListenerConfigurer 
...

override fun configureRabbitListeners(registrar: RabbitListenerEndpointRegistrar) 
        registrar.messageHandlerMethodFactory = messageHandlerMethodFactory()
    

    @Bean
    fun messageHandlerMethodFactory(): MessageHandlerMethodFactory 
        val messageHandlerMethodFactory = DefaultMessageHandlerMethodFactory()
        messageHandlerMethodFactory.setMessageConverter(consumerJackson2MessageConverter())
        return messageHandlerMethodFactory
    

    @Bean
    fun consumerJackson2MessageConverter(): MappingJackson2MessageConverter 
        return MappingJackson2MessageConverter()
    


然后我有一个听众(println 只是暂时让我检查),例如:

@RabbitListener(queues = ["success.queue"])
    fun receivedSuccessMessage(data: MyModel) 
        println(data)
    

模型本身看起来像(再次简化只是为了测试):

@JsonIgnoreProperties(ignoreUnknown = true)
data class MyModel(
        val input: Any,
        val metadata: Any
)

现在,每当该队列收到消息时,我得到的错误是:

Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [[B] to [com.models.MyModel] for GenericMessage [payload=byte[1429]

还将我收到的消息添加到该成功队列中,如下所示:

"input":"somestuff", "metadata": "somestuff"

我也刚刚注意到消息类型是content_type: application/octet-stream。这是我无法控制的,因为这正是我从将消息放入其中的服务收到的:

任何想法为什么会这样 - 我假设我拥有的 RabbitListenerConfigurer 实现将所有消息从字节数组转换为我指定的任何模型。

任何帮助/想法将不胜感激。

谢谢。

更新 好的,我得到了这个工作,但是我对某些事情感到很困惑:

这是我添加的:

@Autowired
lateinit var connectionFactory : ConnectionFactory

@Bean
    fun rabbitListenerContainerFactory(): SimpleRabbitListenerContainerFactory 
        val factory = SimpleRabbitListenerContainerFactory()
        factory.setConnectionFactory(connectionFactory)
        factory.setAfterReceivePostProcessors(SomeBusiness())

        return factory
    

然后是这样的实际转换:

class SomeBusiness : MessagePostProcessor 
    @Throws(AmqpException::class)
    override fun postProcessMessage(message: Message): Message 
        println("WE WENT IN NOW")
        println()
        if (message.messageProperties.contentType == "application/octet-stream") 
            message.messageProperties.contentType = "application/json"
        
        return message
    

但我不明白的是,我已经有了我的 messageHandlerMethodFactory 函数,我认为它完成了所有关于收听消息的工作。既然我有rabbitListenerContainerFactory,是否有任何冲突的担忧。或者它是否完全有效,即我需要rabbitListenerContainerFactory,因为它实现了SimpleRabbitListenerContainerFactory,这使我可以访问setAfterReceivePostProcessors 方法。

再次感谢任何解释。

谢谢

【问题讨论】:

【参考方案1】:

当您收到content_type: application/octet-stream 时,这无法更改。 您可以使用 Spring Interceptor 或 Servlet Filter 将内容类型 application/octet-stream 转换为 application/json

Example using HandlerInterceptor for Spring Boot希望这会有所帮助。谢谢

【讨论】:

问题是我正在编写的应用程序侦听多个队列,其中一个实际上发送application/json - 所以我怎么能说只转换特定的队列?另外,您能否指出进行此转换的界面/文档? 在拦截器中检查 contentype 是否为octet-stream 然后转换为json 否则保持原样。 抱歉,这个拦截器 - 这是我需要实现的东西吗? (刚接触春天,所以还是有点不知所措) 我看了一下那个例子,那是其余的 API 对吧?我遇到了这个docs.spring.io/spring-amqp/reference/html/#post-processing 和setAfterReceivePostProcessors() 的使用但是我对它的使用方式感到非常困惑——你知道任何好的使用例子吗? 谢谢-所以我通过考虑您的答案来解决问题-但是我不明白为什么这一切都有效大声笑-如果您有问题,我已将详细信息留在问题的更新部分任何想法,我都会很感激。【参考方案2】:

请找到以下对我有用的来源,

public void convertByteArrayToObject(Message<?> message) 

    Object request = message.getPayload();
    Object result = null;
    try 
        byte[] byteArray = (byte[]) request;
        ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(byteArray);

        result = new ObjectInputStream(byteArrayInputStream).readObject();
     catch (ClassNotFoundException | IOException e) 
        LOG.error(e.getMessage());
    
    return result;

【讨论】:

【参考方案3】:

我遇到了同样的问题。下面的解决方案帮助我解决了它。

解决方案: Spring 只理解“contentType”,而不是“content_type”。因此,只需为您的消息设置“contentType”属性,例如(在我的情况下,消息生产者不是 spring 应用程序):

String json = ...;
TextMessage textMessage = session.createTextMessage(json);
textMessage.setStringProperty("contentType", "application/json");

说明:MappingJackson2MessageConverter 有方法canConvertFrom(Message&lt;?&gt; message, @Nullable Class&lt;?&gt; targetClass)。该方法检查标头是否包含受支持的 mime 类型。如果再深入一点,我们会发现AbstractMessageConverter.getMimeType(MessageHeaders header)contentTypeResolver,谁得到了合适的属性。该对象的实例位于 CompositeMessageConverterFactory,它是 DefaultContentResolver,,它在方法 resolve(MessageHeaders headers) 中获取内容类型:

Object contentType = headers.get(MessageHeaders.CONTENT_TYPE);

在哪里public static final String CONTENT_TYPE = "contentType";

【讨论】:

以上是关于Spring Boot - RabbitMQ - 将消息内容类型从八位字节流更改为 json的主要内容,如果未能解决你的问题,请参考以下文章

spring-boot 集成 rabbitmq

spring-boot 集成 rabbitmq

spring boot 1.5.4 整合rabbitMQ(十七)

Spring Boot 整合 RabbitMQ

Spring Boot项目配置RabbitMQ集群

Spring Boot 整合 RabbitMQ