Spring Boot - RabbitMQ - 将消息内容类型从八位字节流更改为 json
Posted
技术标签:
【中文标题】Spring Boot - RabbitMQ - 将消息内容类型从八位字节流更改为 json【英文标题】:Spring Boot - RabbitMQ - Changing message content type from octet-stream to json 【发布时间】:2019-12-15 15:17:13 【问题描述】:我在将 RabbitMQ 中收到的 Byte[] 消息转换为基于模型的某个对象时遇到问题。
我正在使用 Spring Boot。
在我已经实现RabbitListenerConfigurer
的应用程序中:
@SpringBootApplication
class Application : RabbitListenerConfigurer
...
override fun configureRabbitListeners(registrar: RabbitListenerEndpointRegistrar)
registrar.messageHandlerMethodFactory = messageHandlerMethodFactory()
@Bean
fun messageHandlerMethodFactory(): MessageHandlerMethodFactory
val messageHandlerMethodFactory = DefaultMessageHandlerMethodFactory()
messageHandlerMethodFactory.setMessageConverter(consumerJackson2MessageConverter())
return messageHandlerMethodFactory
@Bean
fun consumerJackson2MessageConverter(): MappingJackson2MessageConverter
return MappingJackson2MessageConverter()
然后我有一个听众(println
只是暂时让我检查),例如:
@RabbitListener(queues = ["success.queue"])
fun receivedSuccessMessage(data: MyModel)
println(data)
模型本身看起来像(再次简化只是为了测试):
@JsonIgnoreProperties(ignoreUnknown = true)
data class MyModel(
val input: Any,
val metadata: Any
)
现在,每当该队列收到消息时,我得到的错误是:
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [[B] to [com.models.MyModel] for GenericMessage [payload=byte[1429]
还将我收到的消息添加到该成功队列中,如下所示:
"input":"somestuff", "metadata": "somestuff"
我也刚刚注意到消息类型是content_type: application/octet-stream
。这是我无法控制的,因为这正是我从将消息放入其中的服务收到的:
任何想法为什么会这样 - 我假设我拥有的 RabbitListenerConfigurer
实现将所有消息从字节数组转换为我指定的任何模型。
任何帮助/想法将不胜感激。
谢谢。
更新 好的,我得到了这个工作,但是我对某些事情感到很困惑:
这是我添加的:
@Autowired
lateinit var connectionFactory : ConnectionFactory
@Bean
fun rabbitListenerContainerFactory(): SimpleRabbitListenerContainerFactory
val factory = SimpleRabbitListenerContainerFactory()
factory.setConnectionFactory(connectionFactory)
factory.setAfterReceivePostProcessors(SomeBusiness())
return factory
然后是这样的实际转换:
class SomeBusiness : MessagePostProcessor
@Throws(AmqpException::class)
override fun postProcessMessage(message: Message): Message
println("WE WENT IN NOW")
println()
if (message.messageProperties.contentType == "application/octet-stream")
message.messageProperties.contentType = "application/json"
return message
但我不明白的是,我已经有了我的 messageHandlerMethodFactory
函数,我认为它完成了所有关于收听消息的工作。既然我有rabbitListenerContainerFactory
,是否有任何冲突的担忧。或者它是否完全有效,即我需要rabbitListenerContainerFactory
,因为它实现了SimpleRabbitListenerContainerFactory
,这使我可以访问setAfterReceivePostProcessors
方法。
再次感谢任何解释。
谢谢
【问题讨论】:
【参考方案1】:当您收到content_type: application/octet-stream
时,这无法更改。
您可以使用 Spring Interceptor 或 Servlet Filter 将内容类型 application/octet-stream
转换为 application/json
。
Example using HandlerInterceptor for Spring Boot希望这会有所帮助。谢谢
【讨论】:
问题是我正在编写的应用程序侦听多个队列,其中一个实际上发送application/json
- 所以我怎么能说只转换特定的队列?另外,您能否指出进行此转换的界面/文档?
在拦截器中检查 contentype 是否为octet-stream
然后转换为json
否则保持原样。
抱歉,这个拦截器 - 这是我需要实现的东西吗? (刚接触春天,所以还是有点不知所措)
我看了一下那个例子,那是其余的 API 对吧?我遇到了这个docs.spring.io/spring-amqp/reference/html/#post-processing 和setAfterReceivePostProcessors()
的使用但是我对它的使用方式感到非常困惑——你知道任何好的使用例子吗?
谢谢-所以我通过考虑您的答案来解决问题-但是我不明白为什么这一切都有效大声笑-如果您有问题,我已将详细信息留在问题的更新部分任何想法,我都会很感激。【参考方案2】:
请找到以下对我有用的来源,
public void convertByteArrayToObject(Message<?> message)
Object request = message.getPayload();
Object result = null;
try
byte[] byteArray = (byte[]) request;
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(byteArray);
result = new ObjectInputStream(byteArrayInputStream).readObject();
catch (ClassNotFoundException | IOException e)
LOG.error(e.getMessage());
return result;
【讨论】:
【参考方案3】:我遇到了同样的问题。下面的解决方案帮助我解决了它。
解决方案: Spring 只理解“contentType”,而不是“content_type”。因此,只需为您的消息设置“contentType”属性,例如(在我的情况下,消息生产者不是 spring 应用程序):
String json = ...;
TextMessage textMessage = session.createTextMessage(json);
textMessage.setStringProperty("contentType", "application/json");
说明:
类MappingJackson2MessageConverter
有方法canConvertFrom(Message<?> message, @Nullable Class<?> targetClass)
。该方法检查标头是否包含受支持的 mime 类型。如果再深入一点,我们会发现AbstractMessageConverter.getMimeType(MessageHeaders header)
有contentTypeResolver
,谁得到了合适的属性。该对象的实例位于 CompositeMessageConverterFactory,它是 DefaultContentResolver,
,它在方法 resolve(MessageHeaders headers)
中获取内容类型:
Object contentType = headers.get(MessageHeaders.CONTENT_TYPE);
在哪里public static final String CONTENT_TYPE = "contentType";
【讨论】:
以上是关于Spring Boot - RabbitMQ - 将消息内容类型从八位字节流更改为 json的主要内容,如果未能解决你的问题,请参考以下文章