Spring和SpringBoot整合RabbitMQ
Posted Lossdate
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spring和SpringBoot整合RabbitMQ相关的知识,希望对你有一定的参考价值。
Spring和SpringBoot整合RabbitMQ
一、Spring整合RabbitMQ
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.2.7.RELEASE</version>
</dependency>
</dependencies>
1. Producer
1.1 Config
@Configuration
public class RabbitConfig {
/**
* 连接工厂
*/
@Bean
public ConnectionFactory connectionFactory() {
return new CachingConnectionFactory(URI.create("amqp://root:123456@192.168.200.136:5672/%2f"));
}
/**
* RabbitTemplate
*/
@Bean
@Autowired
public RabbitTemplate rabbitTemplate(ConnectionFactory factory) {
return new RabbitTemplate(factory);
}
/**
* RabbitAdmin
*/
@Bean
@Autowired
public RabbitAdmin rabbitAdmin(ConnectionFactory factory) {
return new RabbitAdmin(factory);
}
/**
* Queue
*/
@Bean
public Queue queue() {
return QueueBuilder.nonDurable("queue.anno").build();
}
/**
* Exchange
*/
@Bean
public Exchange fanoutExchange() {
return new FanoutExchange("ex.anno.fanout", false, false, null);
}
/**
* Binding
*/
@Bean
@Autowired
public Binding binding(Queue queue, Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("key,anno").noargs();
}
}
1.1 Producer
public class ProducerApp {
public static void main(String[] args) throws Exception {
AbstractApplicationContext context = new AnnotationConfigApplicationContext(RabbitConfig.class);
RabbitTemplate template = context.getBean(RabbitTemplate.class);
MessageProperties messageProperties = MessagePropertiesBuilder
.newInstance()
.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
.setContentEncoding("gbk")
.setHeader("myKey", "myValue")
.build();
Message msg = MessageBuilder.withBody("Hello World".getBytes("gbk"))
.andProperties(messageProperties)
.build();
template.send("ex.anno.fanout", "key.anno", msg);
context.close();
}
}
2. Consumer拉取推送消息
2.1 Config
@Configuration
public class RabbitConfig {
/**
* 连接工厂
*/
@Bean
public ConnectionFactory connectionFactory() {
return new CachingConnectionFactory(URI.create("amqp://root:123456@192.168.200.136:5672/%2f"));
}
/**
* RabbitTemplate
*/
@Bean
@Autowired
public RabbitTemplate rabbitTemplate(ConnectionFactory factory) {
return new RabbitTemplate(factory);
}
/**
* RabbitAdmin
*/
@Bean
@Autowired
public RabbitAdmin rabbitAdmin(ConnectionFactory factory) {
return new RabbitAdmin(factory);
}
/**
* Queue
*/
@Bean
public Queue queue() {
return QueueBuilder.nonDurable("queue.anno").build();
}
}
2.2 Consumer
public class ConsumerApp {
public static void main(String[] args) throws Exception {
AbstractApplicationContext context = new AnnotationConfigApplicationContext(RabbitConfig.class);
RabbitTemplate template = context.getBean(RabbitTemplate.class);
Message receive = template.receive("queue.anno");
System.out.println(new String(receive.getBody(), receive.getMessageProperties().getContentEncoding()));
context.close();
}
}
3. Consumer消息监听(用于推消息)
3.1 Config
@ComponentScan("demo") //扫描包
@Configuration
@EnableRabbit
public class RabbitConfig {
/**
* 连接工厂
*/
@Bean
public ConnectionFactory connectionFactory() {
return new CachingConnectionFactory(URI.create("amqp://root:123456@192.168.200.136:5672/%2f"));
}
/**
* RabbitTemplate
*/
@Bean
@Autowired
public RabbitTemplate rabbitTemplate(ConnectionFactory factory) {
return new RabbitTemplate(factory);
}
/**
* RabbitAdmin
*/
@Bean
@Autowired
public RabbitAdmin rabbitAdmin(ConnectionFactory factory) {
return new RabbitAdmin(factory);
}
/**
* Queue
*/
@Bean
public Queue queue() {
return QueueBuilder.nonDurable("queue.anno").build();
}
/**
* SimpleRabbitListenerContainerFactory
*/
@Bean("rabbitListenerContainerFactory")
@Autowired
public SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
// factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
// factory.setAcknowledgeMode(AcknowledgeMode.NONE);
//最少允许多少个并发的消费者
factory.setConcurrentConsumers(10);
//最大允许多少个并发的消费者
factory.setMaxConcurrentConsumers(15);
//按照批次消费消息,一个批次多少个
factory.setBatchSize(10);
return factory;
}
}
3.2 MessageListener
@Component
public class MyMessageListener {
/**
* RabbitListener : 监听消息队列
* com.rabbitmq.client.Channel channel对象
* org.springframework.amqp.core.Message message对象 可以直接操作原生的AMQP消息
* org.springframework.messaging.Message to use the messaging abstraction counterpart
* @Payload 注解方法参数,改参数的值就是消息体
* @Header 注解方法参数,访问指定的消息头字段的值
* @Headers 该注解的方法参数获取该消息的消息头的所有字段,参数类型对应于map集合。
* MessageHeaders 参数类型,访问所有消息头字段
* MessageHeaderAccessor or AmqpMessageHeaderAccessor 访问所有消息头字段
*/
// @RabbitListener(queues = "queue.anno")
// public void onMessage(Message message) {
// System.out.println(new String(message.getBody(), message.getMessageProperties().getContentEncoding()));
// }
@RabbitListener(queues = "queue.anno")
public void onMessage(@Payload String messageStr) {
System.out.println(messageStr);
}
}
3.3 Consumer
public class ConsumerListenerApp {
public static void main(String[] args) {
new AnnotationConfigApplicationContext(RabbitConfig.class);
}
}
二、SpringBoot整合RabbitMQ
1. Producer
1.1 构建Springboot 2.5.0 + Spring for RabbitMQ + Spring Web
1.2 application.properties
spring.application.name=springboot_rabbit
spring.rabbitmq.host=192.168.200.136
spring.rabbitmq.virtual-host=/
spring.rabbitmq.username=root
spring.rabbitmq.password=123456
spring.rabbitmq.port=5672
1.3 Config
@Configuration
public class RabbitConfig {
@Bean
public Queue queue() {
return new Queue("queue.boot", false, false, false, null);
}
@Bean
public Exchange exchange() {
// new Exchange()
// return new TopicExchange("topic.ex", false, false, null);
// return new DirectExchange("direct.ex", false, false, null);
// return new FanoutExchange("fanout.ex", false, false, null);
// return new HeadersExchange("header.ex", false, false, null);
//交换器名称,交换器类型(),是否是持久化的,是否自动删除,交换器属性Map集合
// return new CustomExchange("custom.ex", ExchangeTypes.DIRECT, false, false, null);
return new TopicExchange("ex.boot", false, false, null);
}
@Bean
public Binding binding() {
//绑定的目的地,绑定的类型:到交换器还是到队列,交换器名称,路由key,绑定的属性
// new Binding("", Binding.DestinationType.EXCHANGE, "", "", null);
//绑定的目的地,绑定的类型:到交换器还是到队列,交换器名称,路由key,绑定的属性
// new Binding("", Binding.DestinationType.QUEUE, "", "", null);
//绑定了交换器ex.boot到队列queue.boot,路由key是key.boot
return new Binding("queue.boot",
Binding.DestinationType.QUEUE,
"ex.boot",
"key.boot",
null);
}
}
1.4 Controller
@RestController
public class MessageController {
private final AmqpTemplate rabbitTemplate;
@Autowired
public MessageController(AmqpTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
@RequestMapping("/rabbit/{message}")
public String receive(@PathVariable String message) {
MessageProperties messageProperties = MessagePropertiesBuilder
.newInstance()
.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
.setContentEncoding("utf-8")
.setHeader("Hello", "World")
.build();
Message msg = MessageBuilder
.withBody(message.getBytes(StandardCharsets.UTF_8))
.andProperties(messageProperties)
.build();
//exchange + routing key + msg
rabbitTemplate.send("ex.boot", "key.boot", msg);
return "ok";
}
}
2. Consumer(监听模式)
2.1 构建Springboot 2.5.0 + Spring for RabbitMQ
2.2 application.properties
spring.application.name=springboot_rabbit_consumer
spring.rabbitmq.host=192.168.200.136
spring.rabbitmq.virtual-host=/
spring.rabbitmq.username=root
spring.rabbitmq.password=123456
spring.rabbitmq.port=5672
2.3 Config
@Configuration
public class RabbitConfig {
@Bean
public Queue queue() {
return QueueBuilder.nonDurable("queue.boot").build();
}
}
2.4 ConsumerListener
@Component
public class MyMessageListener {
@RabbitListener(queues = "queue.boot")
public void getMyMessage(@Payload String messageStr,
@Header(name = "Hello") String headerValue) {
System.out.println(messageStr);
System.out.println("Hello -> " + headerValue);
}
}
3. 测试
3.1 启动Consumer和Producer
3.2 访问http://localhost:8080/rabbit/testMsg
可以看到Consumer控制台输出:
testMsg
Hello -> World
以上是关于Spring和SpringBoot整合RabbitMQ的主要内容,如果未能解决你的问题,请参考以下文章
Spring Boot 2.X - Spring Boot整合AMQP之RabbitMQ
Spring Cloud Stream整合Rabbit之重复投递