Spring AMPQ自定义线程池
Posted 大扑棱蛾子
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spring AMPQ自定义线程池相关的知识,希望对你有一定的参考价值。
在项目中遇到了一个棘手的问题。当多个消费者同时执行任务的时候出现了资源争用的问题。日志如下
19-03-19 13:00:17.470 [SimpleAsyncTaskExecutor-1] INFO c.w.m.h.AbstractMessageHandler - 开始验证消息:f3eaa048d8984c2aa69b52a8cdece1e9
19-03-19 13:00:17.470 [SimpleAsyncTaskExecutor-1] INFO c.w.m.h.AbstractMessageHandler - 消息签名验证通过:f3eaa048d8984c2aa69b52a8cdece1e9
19-03-19 13:00:49.906 [SimpleAsyncTaskExecutor-1] INFO c.w.m.h.AbstractMessageHandler - 开始验证消息:e5cc41275f3a4e6ab20cfd2dadfb31a7
19-03-19 13:00:49.906 [SimpleAsyncTaskExecutor-1] INFO c.w.m.h.AbstractMessageHandler - 消息签名验证通过:e5cc41275f3a4e6ab20cfd2dadfb31a7
19-03-19 13:01:12.070 [SimpleAsyncTaskExecutor-1] INFO c.w.m.h.AbstractMessageHandler - 已保存接收到的消息:f3eaa048d8984c2aa69b52a8cdece1e9
19-03-19 13:01:12.071 [SimpleAsyncTaskExecutor-1] INFO c.w.m.h.AbstractMessageHandler - 开始处理消息,消息ID:f3eaa048d8984c2aa69b52a8cdece1e9
19-03-19 13:01:12.071 [SimpleAsyncTaskExecutor-1] INFO c.w.m.h.AbstractMessageHandler - 已保存接收到的消息:e5cc41275f3a4e6ab20cfd2dadfb31a7
19-03-19 13:01:12.072 [SimpleAsyncTaskExecutor-1] INFO c.w.m.h.AbstractMessageHandler - 开始处理消息,消息ID:e5cc41275f3a4e6ab20cfd2dadfb31a7
消息IDf3eaa048d8984c2aa69b52a8cdece1e9
和 e5cc41275f3a4e6ab20cfd2dadfb31a7
是由两个不同的监听处理的。但是从日志上来看,监听A在处理中被挂起了,然后监听B开始执行,然后监听B被挂起,监听A继续执行。
为什么会出现这样的问题,我们也没有试出来。初步怀疑是spring-amqp
监听默认的线程池只有一个线程导致的。如果有同学知道欢迎告知,我的邮箱:jaune162@126.com。
鉴于以上情况,所以采用自定义线程池的方式解决
首先看@RabbitListener
的源代码。
public @interface RabbitListener
...
/**
* The bean name of the @link org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory
* to use to create the message listener container responsible to serve this endpoint.
* <p>If not specified, the default container factory is used, if any.
* @return the @link org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory
* bean name.
*/
String containerFactory() default "";
...
这里可以指定一个containerFactory
。
所以我们的处理思路就是创建一个RabbitListenerContainerFactory
然后指定Spring的线程池。
public ThreadPoolTaskExecutor rabbitListenerTaskExecutor()
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setMaxPoolSize(10); // 指定线程的最大数量
executor.setCorePoolSize(2); // 指定线程池维护线程的最少数量
executor.setQueueCapacity(20); // 指定等待处理的任务数
executor.setThreadNamePrefix("RabbitListenerTaskExecutor-") // 指定线程前缀
return executor;
@Bean
public RabbitListenerContainerFactory defaultRabbitListenerContainerFactory (
SimpleRabbitListenerContainerFactoryConfigurer configurer,
ThreadPoolTaskExecutor rabbitListenerTaskExecutor,
ConnectionFactory connectionFactory)
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
configurer.configure(factory, connectionFactory);
factory.setConcurrentConsumers(1);
factory.setMaxConcurrentConsumers(10);
factory.setTaskExecutor(rabbitListenerTaskExecutor);
return factory;
使用方法如下
@RabbitListener(queues = Queues.ORDER_QUEUE, containerFactory="defaultRabbitListenerContainerFactory ")
在每个监听上都配置之后,可以从控制台看出,如果两条消息同时到达,会启用两个线程来处理。
以上是关于Spring AMPQ自定义线程池的主要内容,如果未能解决你的问题,请参考以下文章
spring异步线程任务Async,自定义配置线程池,Java
Spring Boot使用@Async实现异步调用:自定义线程池