RabbitMQ:SimpleMessageListenerContainer并发消费
Posted ITKaven
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ:SimpleMessageListenerContainer并发消费相关的知识,希望对你有一定的参考价值。
pom.xml
:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.6.2</version>
</parent>
<packaging>jar</packaging>
<groupId>com.kaven</groupId>
<artifactId>springboot</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>springboot</name>
<description>springboot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
application.properties
:
spring.rabbitmq.host=192.168.1.9
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
spring.rabbitmq.virtualHost=/
spring.rabbitmq.exchange=""
spring.rabbitmq.routingKey=kaven
spring.rabbitmq.queue=kaven
RabbitMQProperties
类(RabbitMQ
的参数类):
package com.kaven.springboot.rabbitmq;
import lombok.Getter;
import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;
@ConfigurationProperties(prefix = "spring.rabbitmq")
@Setter
@Getter
public class RabbitMQProperties
private String host;
private int port;
private String username;
private String password;
private String exchange;
private String queue;
private String routingKey;
private String virtualHost;
RabbitMQConfig
类(定义RabbitMQ
组件的配置类):
package com.kaven.springboot.rabbitmq;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;
import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
@Configuration
public class RabbitMQConfig
@Resource
private RabbitMQProperties properties;
@Bean
public ConnectionFactory connectionFactory()
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(properties.getHost(), properties.getPort());
connectionFactory.setUsername(properties.getUsername());
connectionFactory.setPassword(properties.getPassword());
connectionFactory.setVirtualHost(properties.getVirtualHost());
return connectionFactory;
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
//必须是prototype类型
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory)
return new RabbitTemplate(connectionFactory);
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory)
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
// 设置连接工厂
container.setConnectionFactory(connectionFactory);
// 指定要创建的并发消费者数量
// 默认值为1
container.setConcurrentConsumers(3);
// 设置消费者数量的上限
// 默认为concurrentConsumers
// 消费者将按需添加
// 不能小于concurrentConsumers
container.setMaxConcurrentConsumers(5);
// 设置要从中接收消息的队列名称
// 参数为String... queueName
container.setQueueNames(properties.getQueue());
// 控制容器在消息确认方面的行为
// 自动确认
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
// 设置MessageListener(消息监听器)
container.setMessageListener(message ->
System.out.printf("接收消息: %s\\n", new String(message.getBody(), StandardCharsets.UTF_8)));
return container;
@Bean
public DirectExchange exchange()
return new DirectExchange(properties.getExchange());
@Bean
public Queue queue()
//队列持久
return new Queue(properties.getQueue(), true);
@Bean
public Binding binding()
return BindingBuilder.bind(queue()).to(exchange()).with(properties.getRoutingKey());
Producer
类(用于发布消息):
package com.kaven.springboot.rabbitmq;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
@Component
public class Producer
private final RabbitTemplate rabbitTemplate;
@Resource
private RabbitMQProperties properties;
@Autowired
public Producer(RabbitTemplate rabbitTemplate)
this.rabbitTemplate = rabbitTemplate;
public void sendMsg(String msg)
CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
MessageProperties messageProperties = new MessageProperties();
messageProperties.setExpiration("10000");
Message message = new Message(msg.getBytes(StandardCharsets.UTF_8), messageProperties);
rabbitTemplate.send(properties.getExchange(), properties.getRoutingKey(), message, correlationId);
ProducerController
类(用于发布消息的接口):
package com.kaven.springboot.rabbitmq;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
@RestController
public class ProducerController
@Resource
private Producer producer;
@GetMapping("/send")
public String send(String msg)
producer.sendMsg(msg);
return "发送消息成功";
启动类:
package com.kaven.springboot;
import com.kaven.springboot.rabbitmq.RabbitMQProperties;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.ConfigurationPropertiesScan;
@SpringBootApplication
@ConfigurationPropertiesScan(basePackageClasses = RabbitMQProperties.class)
public class SpringbootApplication
public static void main(String[] args)
SpringApplication application = new SpringApplication(SpringbootApplication.class);
application.run(args);
启动应用,可见Spring Boot
在尝试连接RabbitMQ
,并且建立了连接。
kaven
队列有三个消费者,符合预期。
使用Postman
来进行测试,请求接口2000
次,每次延迟100ms
。
当有大量消息路由到SimpleMessageListenerContainer
实例监听的队列时,消费者将按需添加,数量达到maxConcurrentConsumers
。
控制台也在不断输出,说明消息监听器起作用了。
测试完成。
消费者数量又恢复成默认值concurrentConsumers
。
使用SimpleMessageListenerContainer
并发消费非常便捷,更多功能可以自己去尝试,博客就介绍到这里,如果博主有说错的地方或者大家有不同的见解,欢迎大家评论补充。
以上是关于RabbitMQ:SimpleMessageListenerContainer并发消费的主要内容,如果未能解决你的问题,请参考以下文章
rabbitmq页面出现/etc/rabbitmq/rabbitmq.config(not found)解决方法