Spring amqp批处理接收消息

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spring amqp批处理接收消息相关的知识,希望对你有一定的参考价值。

我们使用Spring AMQP从RabbitMQ读取消息,现在我们一次只读取一条消息离开队列,无论如何我可以从队列中读取多条消息然后处理批处理?

我看到Spring中有一个BatchingStrategy,我怎么能将那个插入connectionFactory?

这是我的代码:

CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host, port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);

SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        JsonMessageConverter converter = new JsonMessageConverter();
        DefaultClassMapper defaultClassMapper = new DefaultClassMapper();
        defaultClassMapper.setDefaultType(Message.class);
        converter.setClassMapper(defaultClassMapper);
        factory.setMessageConverter(converter);
        factory.setConcurrentConsumers(3);

...

public class Processor implements IChannelProcessor {
@Override
    public void process(Message message) {
        validateMessageEvent(message);
        // process the message
答案

BatchingStrategy用于将多个消息段放入单个amqp消息中;容器会自动声明此类消息。它对你的目的无济于事。

做你想做的事;而不是使用POJO消息(@RabbitListener)你将不得不使用一个消息监听器容器与acknowledgeMode设置为MANUALChannelAwareMessageListener

处理完一批邮件后,请在频道上调用basicAck以获取批处理中的所有邮件。

以上是关于Spring amqp批处理接收消息的主要内容,如果未能解决你的问题,请参考以下文章

spring-amqp手动停止RabbitListener

Spring AMQP - 发送和接收消息

Spring AMQP使用noLocal使用者发送和接收临时队列

Spring AMQP项目

Spring AMQP:由于缺少回复属性,从 POJO 侦听器发送回复失败

Spring AMQP 错误处理策略详解