RabbitMq学习笔记
Posted SmallCuteMonkey80%
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMq学习笔记相关的知识,希望对你有一定的参考价值。
文章目录
1. RabbitMq (消息中间件)
1.概念:是基于队列模式实现的异步/同步的传输数据。
作用:支持高并发,异步解耦,流量削峰,降低耦合度。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-OUiyMJnR-1664445251307)(C:\\Users\\24473\\AppData\\Roaming\\Typora\\typora-user-images\\image-20220726110735094.png)]
2.传统的Http请求存在哪些缺点?
http请求是基于请求和响应的模型,在高并发的情况下,客户端发送大量的请求到服务器,可以会导致我们的服务器处理请求堆积。
Tomcat的服务器处理请求有自己独立线程,如果超过最大线程数会把请求缓存到队列中,如果请求堆积过多会导致服务器崩溃。 所以会都会在 **nginx入口进行限流,**整合服务保护框架。
如果请求超过可能会导致我们客户端发生重试策略,可能会导致接口幂等性问题。
接口是http协议的情况下,最好不要用处理耗时的业务,可以通过多线程和mq进行异步的处理。
3.mq的使用场景?
异常发送短信
异常发送优惠券
比较耗时操作。
使用多线程进行发送短信,优惠券,会使用到多核cpu,这样会导致cpu的开销比较大(适合一些小的项目)
最好使用mq进行异步的方式发送短信。
模式图解:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-UUqDr308-1664445251310)(C:\\Users\\24473\\AppData\\Roaming\\Typora\\typora-user-images\\image-20220726115459074.png)]
4. mq服务器如何保证消息不丢失?
生产者发送消息给Mq服务器端,MQ服务器需要缓存这个消息。
- 持久化机制
MQ如何抗高并发?
mq根据自身的能力情况,拉取mq服务器消息消费。默认情况拉取一条消息。
缺点:
出现消息延迟,解决可以消费者进行集群。批量获取消息。
5.VirtualHost?
相当于消息的分类,用来区分不同各类的消息。里面有队列,队列用来存放我们的消息。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-78hsIQti-1664445251311)(C:\\Users\\24473\\AppData\\Roaming\\Typora\\typora-user-images\\image-20220731121015195.png)]
Exchange 分派我们的消息存放在哪个队列存放,相当于我们的nginx进行路由。
快速入门rabbitMq
首先需要再RabbitMQ服务页面创建 Virtual Host和队列
/test Virtual Host
-----订单队列
----- 支付队列
- 在RabbitMq平台创建一个队列
- 编写生产者代码
- 编写消费者代码
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-8iXNxJ7T-1664445251312)(C:\\Users\\24473\\AppData\\Roaming\\Typora\\typora-user-images\\image-20220916133810829.png)]
1. 引入相关的Maven依赖
2.生产者通过Channel进行投递消息到queue代码
public class Producer
// 队列名称
public static final String QUEUE_NAME="hello";
// 发消息
public static void main(String[] args) throws IOException, TimeoutException
// 创建一个连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 工厂IP连接RabbitMQ的队列
factory.setHost("192.168.163.128");
// 用户名
factory.setUsername("admin");
// 密码
factory.setPassword("123");
factory.setPort(5672);
// 创建连接
Connection connection = factory.newConnection();
// 获取信道
Channel channel = connection.createChannel();
/*
* 生成一个队列
* 参数1:队列名称
* 参数2:队列里面的消息是否持久化,默认情况下,消息存储在内存中
* 参数3:该队列是否只供一个消费者进行消费,是否进行消费共享,true可以多个消费者消费,
* false只能一个消费者消费
* 参数4:是否自动删除:最后一个消费者断开连接之后,该队列是否自动删除,true则自动删除,
* false不自动删除
* 参数5:其他参数
* */
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
// 发消息
String message = "hello world";
/*
* 发送一个消息
* 参数1:发送到哪个交换机
* 参数2:路由的key值是那个,本次是队列的名称
* 参数3:其他参数信息
* 参数4:发送消息的消息体
* */
channel.basicPublish("",QUEUE_NAME,null,message.getBytes(StandardCharsets.UTF_8));
System.out.println("消息发送完毕!");
成功之后可以到 RabbitMq的图形页面的 ready那里看到消息多了一条在队列中。
3.消费者通过Channel消费的代码 注意队列名称需要一致
public class Consumer
// 队列名称
public static final String QUEUE_NAME = "hello";
// 接受消息
public static void main(String[] args) throws IOException, TimeoutException
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.163.128");
factory.setUsername("admin");
factory.setPassword("123");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 声明 接受消息
DeliverCallback deliverCallback = (consumerTag,message) ->
System.out.println("消费者获取消息为:"+new String(message.getBody(),"UTF-8"));
;
// 声明 取消消息
CancelCallback cancelCallback = consumer ->
System.out.println("消息消费被中断");
;
/*
* 消费者接收消息
* 参数1:表示消费哪个UI列
* 参数2:消费成功之后,队列是否需要自动应答,autoAck:true 表示自动应答,autoAck: false表示手动应答 一般实际生产中选择手动应答。
* 参数3:消费者成功消费的回调
* 参数4:消费者取消消费的回调
*/
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Lx8YUJsT-1664445251313)(C:\\Users\\24473\\AppData\\Roaming\\Typora\\typora-user-images\\image-20220916184748103.png)]
4. mq如何保证消息不丢失(mq服务器默认情况下都会对mq消息进行持久化)
-
producer
确保投递到mq服务器的消息成功 :可以通过Ack 消息确认机制实现,同步或者异步
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-AzhUeEWL-1664445251314)(C:\\Users\\24473\\AppData\\Roaming\\Typora\\typora-user-images\\image-20220916184829321.png)]
-
consumer
消费成功之后发送通知到mq服务器,删除消息成功的消息,避免重复消费。
项目中通过这个方法进行通知的:
//告诉服务器收到这条消息 已经被消费 可以在队列删掉 这样以后就不会再发了 否则消息服务器以为这条消息没处理掉 后续还会在发 //消息的标识,false只确认当前一个消息收到,true确认所有consumer获得的消息 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
kafka消费成功是没有立即进行删除的。
5. mq默认消费者均摊消费消息
可以在创建的Channel中设置一次消费消息的数目。
channel.basicQos(1); //表示一次性消费一条消息 channel.basicQos(2); //表示一次性消费两条 // 注意消费者消费消息成功之后必须通过Ack机制通知 已经消费成功和mq服务器删除消费的消息之后才会继续往下进行消费 channel.basicAck(envelope.getDeliveryTag(),false);
6. RabbitMq交换机类型
图片:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Jpt2BK9W-1664445251315)(C:\\Users\\24473\\AppData\\Roaming\\Typora\\typora-user-images\\image-20220917115206295.png)]
Direct exchange(直连交换机)
Fanout exchange(扇型交换机)
Topic exchange (主题交换机)
Headers exchange(头交换机)
/Virtual Hosts --区分 不同的团队
----队列 存放消息
---- 交换机 路由消息存放在哪个队列 相当于nginx
---- 路由key 分发规则
6.1 fanout交换机(可以生产者投递同样的消息,不同队列存放相同消息,消费者消费相同消息。):
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-xljEStLo-1664445251316)(C:\\Users\\24473\\AppData\\Roaming\\Typora\\typora-user-images\\image-20220917115504322.png)]
6.2 direct 交换机
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-04eShRWH-1664445251317)(C:\\Users\\24473\\AppData\\Roaming\\Typora\\typora-user-images\\image-20220917174608166.png)]
6.3 topic 交换机(正则表达式进行匹配)正式工作很多中使用这种
注意: #号表示支持匹配多个词,* 表示只可以匹配一个词
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ytN8v32a-1664445251318)(C:\\Users\\24473\\AppData\\Roaming\\Typora\\typora-user-images\\image-20220917175304288.png)]
7. RabbitMq是如果进行路由的?
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-4lnKOodR-1664445251319)(C:\\Users\\24473\\AppData\\Roaming\\Typora\\typora-user-images\\image-20220917121627137.png)]
RabbitMq是基于Amqp协议实现的分布式消息中间件,路由key(routing_key)是根据Exchange的type 和Binding的binding_key(建立queue和Exchange之间的绑定关系 )来决定的。
type类型为 fanout(扇形模式,广播模式):不会基于routing_key进行匹配,会发送到所以的队列中。
direct: 完整的匹配,routing_key 和binding_key完整相同,发送到相关的队列。
topic: 正则表达式进行匹配,如果routing_key 和binding_key符合正则匹配,会发送到符合的queue中。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-eoJivUkt-1664445251319)(C:\\Users\\24473\\AppData\\Roaming\\Typora\\typora-user-images\\image-20220917123039434.png)]
2.SpringBoot整合RabbitMq实现
1.引入依赖整合
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>2.2.6.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.4.5</version>
</dependency>
</dependencies>
2.写配置文件(可以测试三种交换机)
package com.mq.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @Author : JCccc
* @CreateTime : 2019/9/3
* @Description :
**/
@Configuration
class DirectRabbitConfig
//队列 起名:TestDirectQueue
@Bean
public Queue TestDirectQueue()
// durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
// exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
// autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
// return new Queue("TestDirectQueue",true,true,false);
//一般设置一下队列的持久化就好,其余两个就是默认false
return new Queue("TestDirectQueue",true);
//Direct交换机 起名:TestDirectExchange
@Bean
DirectExchange TestDirectExchange()
// return new DirectExchange("TestDirectExchange",true,true);
return new DirectExchange("TestDirectExchange",true,false);
//绑定 将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting
@Bean
Binding bindingDirect()
return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectRouting");
@Bean
DirectExchange lonelyDirectExchange()
return new DirectExchange("lonelyDirectExchange");
@Bean("topicQueue")
Queue topicQueu()
return new Queue("topicQueue");
@Bean("testTopicExchange")
TopicExchange testTopicExchange()
return new TopicExchange("testTopicExchange",true,false);
@Bean
Binding topicBinding(@Qualifier("topicQueue") Queue queue,@Qualifier("testTopicExchange") TopicExchange topicExchange)
return BindingBuilder.bind(queue).to(topicExchange).with("topicKey");
3.写生产者code
package com.mq.producer;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
/**
* @Author : JCccc
* @CreateTime : 2019/9/3
* @Description :
**/
@RestController
public class SendMessageController
@Autowired
RabbitTemplate rabbitTemplate; //使用RabbitTemplate,这提供了接收/发送等等方法
@GetMapping("/sendDirectMessage")
public String sendDirectMessage()
String messageId = String.valueOf(UUID.randomUUID());
String messageData = "test message, hello!";
String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
Map<String,Object> map=new HashMap<>();
map.put("messageId",messageId);
map.put("messageData",messageData);
map.put("createTime",createTime);
//将消息携带绑定键值:TestDirectRouting 发送到交换机TestDirectExchange
rabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectRouting", map);
return "ok";
4.写消费者code
package com.mq.config.consumer;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Map;
@Component
@RabbitListener(queues = "TestDirectQueue") //监听队列名称
public class DirectConsumer
//加上这个注解相当于消费成功进行回调
@RabbitHandler
public void process(Map testMessage)
System.out.println("DirectReceiver接收到消息:" + testMessage.toString());
5.启动类文件
package com.mq;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
class RabbitMqMain15672
public static void main(String[] args)
SpringApplication.run(RabbitMqMain15672.class,args);
6.配置文件 注意端口号需要为 5672不然报错 Socked closed
server:
port:
spring:
rabbitmq:
# host: 192.168.80.88 #mq服务器ip,默认为localhost
port: 5672 #mq服务器port,默认为5672
username: test #mq服务器username,默认为gust
password: test #mq服务器password,默认为guest
生产者如何获取消费结果
1.根据业务来定
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-zH07K2lq-1664445251320)(C:\\Users\\24473\\AppData\\Roaming\\Typora\\typora-user-images\\image-20220918151406509.png)]
订单号在生产者那边记录,如果消息成功之后插入数据库成功也会生成该订单号的一条数据,则可以进行删除这个消费成功的消息。
2.RocketMq 自带全局id,能够根据根据这个全局id获取消费结果。
原理: 生产者会发送消费给mq服务器,mq服务器生成一个全局id,消费者消费消息之后会通知mq服务器并发送标记消息消费成功。
死信队列
产生背景:俗称备胎队列,mq因为某种原因拒收该消息之后,可以转移到死信队列中存放,它也可以有交换机和路由key等。
产生原因:
-
消息投递到mq中存放消息已经过期 会自动的转移到 死信队列里面
-
消息队列达到最大长度
-
消费者多次消费消息失败,就会转移到死信队列中
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-71RkCfso-1664445251321)(C:\\Users\\24473\\AppData\\Roaming\\Typora\\typora-user-images\\image-20220918162612816.png)]
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-oHPOEwGm-1664445251321)(C:\\Users\\24473\\AppData\\Roaming\\Typora\\typora-user-images\\image-20220918164219496.png)]
应用场景
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-BAn9EDVq-1664445251322)(C:\\Users\\24473\\AppData\\Roaming\\Typora\\typora-user-images\\image-20220918164832373.png)]
消费者消费报异常引入mq无限的重试问题
默认情况下我们的消费者如果消费失败则mq会无限的进行重试,这样会导致消费者重复进行消费。我们应该人为的控制消费者重试的次数,所以会产生幂等性问题? 幂等性保证数据唯一
什么情况下消费者需要实现重试策略?
-
消费者获取消息后,用第三方接口失败?
需要,这个可能是网络延迟的原因,重试可能会调用成功。
-
消费者获取消息之后代码出现错误?需要重试策略?
不需要,可以把消息存入日志表中去,后期定期可以通过定时任务或者人工干预的方式。可以使用死信队列重新消费这些消息。
消费者手动的Ack签收这条消息已经消费成功
//告诉服务器收到这条消息 已经被消费 可以在队列删掉 这样以后就不会再发了 否则消息服务器以为这条消息没处理掉 后续还会在发
//消息的标识,false只确认当前一个消息收到,true确认所有consumer获得的消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
接口幂等性解决?
在业务层面使用全局唯一的id来进行约束,在并发情况下面是不靠谱的,防止多次请求导致的数据重复我们可以在 (insert)插入操作的话数据库中设置 唯一主键,更新操作可以通过 数据库的乐观锁机制实现。
以上是关于RabbitMq学习笔记的主要内容,如果未能解决你的问题,请参考以下文章