消息队列客户端开发向导二(基于 Spring 的 amqp 实现)
Posted Lemo_wd
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了消息队列客户端开发向导二(基于 Spring 的 amqp 实现)相关的知识,希望对你有一定的参考价值。
前篇 消息队列客户端开发向导(基于 Spring 的 amqp 实现)
一、应答模式与重试机制
NONE 自动应答 即 autoAck 等于 true,rabbitmq 会自动把发送出去的消息置为确认,然后从内存/磁盘中删除,而不管消费者是否真正消费了消息。
MANUAL 需要手动 ACK/NACK
AUTO 根据是否抛出异常来决定 ACK/NACK
注:如果 应答模式为 AUTO,并且 retry.enabled 为 false,如果发生异常则会一直重试。如果应答模式为 NONE,则 prefetch 总是 0,即总是获取新消息。如果是 AUTO,prefetch 最小是 5。
配置参考
spring: rabbitmq: host: 122.51.195.163 username: rabbitmq password: rabbitmq listener: type: simple simple: retry: enabled: true max-attempts: 1 max-concurrency: 100 prefetch: 26 batch-size: 1000 # 需要配合 receive-timeout 来使用 acknowledge-mode: auto template: receive-timeout: 50S # 50秒 超时
二、批处理
批量发送
配置批处理策略,当批完成时才发送消息给 rabbitmq
/** * SimpleBatchingStrategy有三个参数:、bufferLimit和timeout * batchSize:缓存中的消息个数达到此值时,将批量发送缓存的消息 * bufferLimit:缓存中的批消息的大小(单位字节),一旦达到这个值,无论是否达到batchSize,都将批量发送缓存中消息 * timeout: 当超过此时间没有新消息传入缓存,即使batchSize、bufferLimit都没达到极限,都将批量发送缓存中的消息 * refer https://docs.spring.io/spring-amqp/docs/2.2.7.RELEASE/reference/html/#template-batching */ @Bean("batchQueueRabbitTemplate") public BatchingRabbitTemplate customBatchingRabbitTemplate(ConnectionFactory connectionFactory) { BatchingStrategy strategy = new SimpleBatchingStrategy(10, 25_000, 3_000); BatchingRabbitTemplate template = new BatchingRabbitTemplate(strategy, batchQueueTaskScheduler()); template.setConnectionFactory(connectionFactory); return template; } @Bean public TaskScheduler batchQueueTaskScheduler(){ ThreadPoolTaskScheduler taskScheduler=new ThreadPoolTaskScheduler(); taskScheduler.setPoolSize(8); return taskScheduler; }
批处理发送消息
private final BatchingRabbitTemplate batchingRabbitTemplate; public void sendTask(TempMsgCbTask task) { batchingRabbitTemplate.convertAndSend(TEMP_MSG_CB_EXCHANGE, task.getLevel(), task); }
批量消费
默认批数据会自动 debatch (默认数量是 3?)。为了一次消费多个,需要设置 batchListener 为 true
@Bean(name = "consumerBatchContainerFactory") @ConditionalOnProperty(prefix = "spring.rabbitmq.listener", name = "type", havingValue = "simple", matchIfMissing = true) public SimpleRabbitListenerContainerFactory consumerBatchContainerFactory( SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setBatchListener(true); factory.setConsumerBatchEnabled(true); factory.setBatchSize(5); // 新消息间隔时间超过 5秒则自动 fire factory.setReceiveTimeout(5000L); configurer.configure(factory, connectionFactory); return factory; }
批量消费消息
@RabbitListener(queues = TEMP_MSG_CB_LOG_QUEUE_1, priority = "1", containerFactory = "consumerBatchContainerFactory") public void logQueue1(List<TempMsgCbTask> in) { log.info("Message read from logQueue1 msgId"); List<TemplateSendCollect> sendCollects = tempPushStore.updateCbDetailAndLog(in); templateSendLogService.doUpdate(sendCollects); }
233
以上是关于消息队列客户端开发向导二(基于 Spring 的 amqp 实现)的主要内容,如果未能解决你的问题,请参考以下文章