RabbitMQ整合SpringBoot
Posted 永旗狍子
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ整合SpringBoot相关的知识,希望对你有一定的参考价值。
RabbitMQ整合SpringBoot
一.RabbitMQ整合SpringBoot(手动ack)
1.创建SpringBoot工程
2.导入依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
3.编写配置文件
application.yml
spring: rabbitmq: host: 192.168.247.128 port: 5672 username: root password: 123 virtual-host: /root listener: simple: acknowledge-mode: manual #手动ack publisher-confirm-type: simple #开启config机制 publisher-returns: true #开启return机制
4.声明exchange,queue
@Configuration
public class RabbitMQConfig
//1. 创建exchange - topic
@Bean // 把方法的返回值放入到IOC容器中,方法的名字就是bean的ID
public TopicExchange getTopicExchange()
return new TopicExchange("boot-topic-exchange",true,false);
//2. 创建queue
@Bean
public Queue getQueue()
return new Queue("boot-queue",true,false,false,null);
//3. 绑定在一起
@Bean
public Binding getBinding(TopicExchange topicExchange,Queue queue)
return BindingBuilder.bind(queue).to(topicExchange).with("*.red.*");
5.发布消息到RabbitMQ
在test测试方法中
@SpringBootTest
class SpringbootRabbitmqApplicationTests
@Autowired
// xxxxxxTemplate;
// JdbcTemplate; // Spring和JDBC的整合
// RedisTemplate; // SPring和Redis
// SqlSessionTemplate; // Sprign和MyBatis
// HibnerateTemplate;
// Spring和RabbitMQ整合的bean,可以通过这个bean来操作MQ
private RabbitTemplate rabbitTemplate;
@Test
void contextLoads()
System.out.println(rabbitTemplate);
for (int i = 0; i < 1; i++)
rabbitTemplate.convertAndSend("boot-topic-exchange", "text", "红色大狼狗" + i);
System.out.println("消息发送完成。。。。。");
6.创建消费者监听消息
// 手动ack
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
@Component
public class Consumer
// @RabbitListener(queues = "boot-queue")
// public void getMessage(String message)
// System.out.println("接受到的消息:" + message);
//
@RabbitListener(queues = "boot-queue") // 指定监听那个队列中的消息
public void getMessage(String data, Channel channel, Message message)
// 因为在消费的过程中可能会出现异常,所以把所有的业务代码都要用try包起来
try
// 1、消费这个消息
System.out.println("监听者获取到的消息是:" + data);
Thread.sleep(500);
// int i = 10 / 0; // 模拟消费过程中出现了异常
// 2.获取消息的表示
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try
// 3.把消息消费成功,告诉MQ这个消息我已经消费了
// 第一个参数是应答消息的标识(10)
// 第二个参数是否批量应答(如果开启的批量应答,会把小于10的消息全部应答)
channel.basicAck(deliveryTag, false);
System.out.println("消息【" + deliveryTag + "】应答MQ--》已完成");
catch (IOException e)
// e.printStackTrace();
catch (InterruptedException e)
System.out.println("消费者在消费的过程中出现了异常。。。");
// 既然出现异常了,说明消息没有正常消费,应该通知MQ这个消息我无法消费,把这个消息转发给其他的消费者
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try
// 应答MQ,这个消息我消费不了
// 第一个参数是应答消息的标识(10)
// 第二个参数是否批量应答(如果开启的批量应答,会把小于10的消息全部应答)
// 第三个参数:该消息是否重入队列,压倒队列的头部
// false:该消息就会丢失
channel.basicNack(deliveryTag, false, true);
System.out.println("消息【" + deliveryTag + "】应答MQ--》未完成");
catch (IOException ex)
ex.printStackTrace();
解决:消息是否到达交换机?消息是否到达队列?
- 采用Confirm保证消息到达exchange,无法保证消息可以被exchange分发到指定queue。
- exchange是不能持久化消息的,queue是可以持久化消息。
- 采用Return机制来监听消息是否从exchange送到了指定的queue中。
7.开启Confirm和Return机制
@Component
public class RabbitMQConfirm implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback
@Autowired
private RabbitTemplate rabbitTemplate;
// init-mtehod<bean init-method>
@PostConstruct // 在对象实例化完成以后调用
public void init()
// 消息发送出去后就会调用这个类中的confirm();
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnsCallback(this);
// confirm是确认消息是否到达交换机
@Override
public void confirm(CorrelationData correlationData, boolean ack, String context)
if (ack)
System.out.println("消息已经发送到了交换机上:" + context);
else
System.out.println("消息没有发送到了交换机上:" + context);
// 消息没有发送到队列时调用
@Override
public void returnedMessage(ReturnedMessage returnedMessage)
byte[] body = returnedMessage.getMessage().getBody();
System.out.println("消息没有发送到队列"+returnedMessage.getRoutingKey()+"-->"+new String(body));
解决:消费者在消费消息时出现闪断的问题?
如下图所示:
为了解决这个问题我们需要借助Redis,下一篇文章解决。
以上是关于RabbitMQ整合SpringBoot的主要内容,如果未能解决你的问题,请参考以下文章
SpringBoot 整合RabbitMq 自定义消息监听容器来实现消息批量处理
Springboot整合RabbitMQ(三)——Topic主题交换机