RabbitMq学习笔记

Posted SmallCuteMonkey80%

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMq学习笔记相关的知识,希望对你有一定的参考价值。

文章目录

1. RabbitMq (消息中间件)

1.概念:是基于队列模式实现的异步/同步的传输数据。

作用:支持高并发,异步解耦,流量削峰,降低耦合度。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-OUiyMJnR-1664445251307)(C:\\Users\\24473\\AppData\\Roaming\\Typora\\typora-user-images\\image-20220726110735094.png)]

2.传统的Http请求存在哪些缺点?

http请求是基于请求和响应的模型,在高并发的情况下,客户端发送大量的请求到服务器,可以会导致我们的服务器处理请求堆积。

Tomcat的服务器处理请求有自己独立线程,如果超过最大线程数会把请求缓存到队列中,如果请求堆积过多会导致服务器崩溃。 所以会都会在 **nginx入口进行限流,**整合服务保护框架。

如果请求超过可能会导致我们客户端发生重试策略可能会导致接口幂等性问题。

接口是http协议的情况下,最好不要用处理耗时的业务,可以通过多线程和mq进行异步的处理。

3.mq的使用场景?

异常发送短信

异常发送优惠券

比较耗时操作。

使用多线程进行发送短信,优惠券,会使用到多核cpu,这样会导致cpu的开销比较大(适合一些小的项目)

最好使用mq进行异步的方式发送短信。

模式图解:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-UUqDr308-1664445251310)(C:\\Users\\24473\\AppData\\Roaming\\Typora\\typora-user-images\\image-20220726115459074.png)]

4. mq服务器如何保证消息不丢失?

生产者发送消息给Mq服务器端,MQ服务器需要缓存这个消息。

  1. 持久化机制

MQ如何抗高并发?

mq根据自身的能力情况,拉取mq服务器消息消费。默认情况拉取一条消息。

缺点:

出现消息延迟,解决可以消费者进行集群。批量获取消息。

5.VirtualHost?

相当于消息的分类,用来区分不同各类的消息。里面有队列,队列用来存放我们的消息。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-78hsIQti-1664445251311)(C:\\Users\\24473\\AppData\\Roaming\\Typora\\typora-user-images\\image-20220731121015195.png)]

Exchange 分派我们的消息存放在哪个队列存放,相当于我们的nginx进行路由。

快速入门rabbitMq

首先需要再RabbitMQ服务页面创建 Virtual Host和队列

/test Virtual Host

-----订单队列

----- 支付队列

  1. 在RabbitMq平台创建一个队列
  2. 编写生产者代码
  3. 编写消费者代码

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-8iXNxJ7T-1664445251312)(C:\\Users\\24473\\AppData\\Roaming\\Typora\\typora-user-images\\image-20220916133810829.png)]

1. 引入相关的Maven依赖

2.生产者通过Channel进行投递消息到queue代码

public class Producer 
   // 队列名称
   public static  final String QUEUE_NAME="hello";
 
   // 发消息
   public static void main(String[] args) throws IOException, TimeoutException 
       // 创建一个连接工厂
       ConnectionFactory factory = new ConnectionFactory();
 
       // 工厂IP连接RabbitMQ的队列
       factory.setHost("192.168.163.128");
       // 用户名
       factory.setUsername("admin");
       // 密码
       factory.setPassword("123");
 
       factory.setPort(5672);
 
       // 创建连接
       Connection connection = factory.newConnection();
       // 获取信道
       Channel channel = connection.createChannel();
       /*
           * 生成一个队列
           * 参数1:队列名称
           * 参数2:队列里面的消息是否持久化,默认情况下,消息存储在内存中
           * 参数3:该队列是否只供一个消费者进行消费,是否进行消费共享,true可以多个消费者消费,
           *        false只能一个消费者消费
           * 参数4:是否自动删除:最后一个消费者断开连接之后,该队列是否自动删除,true则自动删除,
           *        false不自动删除
           * 参数5:其他参数
       * */
       channel.queueDeclare(QUEUE_NAME,false,false,false,null);
       // 发消息
       String message = "hello world";
       /*
       * 发送一个消息
       * 参数1:发送到哪个交换机
       * 参数2:路由的key值是那个,本次是队列的名称
       * 参数3:其他参数信息
       * 参数4:发送消息的消息体
       * */
       channel.basicPublish("",QUEUE_NAME,null,message.getBytes(StandardCharsets.UTF_8));
       System.out.println("消息发送完毕!");
 
   

成功之后可以到 RabbitMq的图形页面的 ready那里看到消息多了一条在队列中。

3.消费者通过Channel消费的代码 注意队列名称需要一致

public class Consumer 
    // 队列名称
    public static final String QUEUE_NAME = "hello";
 
    // 接受消息
    public static void main(String[] args) throws IOException, TimeoutException 
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
 
        factory.setHost("192.168.163.128");
        factory.setUsername("admin");
        factory.setPassword("123");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
 
        // 声明 接受消息
        DeliverCallback deliverCallback = (consumerTag,message) -> 
            System.out.println("消费者获取消息为:"+new String(message.getBody()"UTF-8"));
        ;
 
        // 声明 取消消息
        CancelCallback cancelCallback = consumer -> 
            System.out.println("消息消费被中断");
        ;
 
        /*
        * 消费者接收消息
        * 参数1:表示消费哪个UI列
        * 参数2:消费成功之后,队列是否需要自动应答,autoAck:true 表示自动应答,autoAck: false表示手动应答 一般实际生产中选择手动应答。
        * 参数3:消费者成功消费的回调
        * 参数4:消费者取消消费的回调
         */
        channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
    

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Lx8YUJsT-1664445251313)(C:\\Users\\24473\\AppData\\Roaming\\Typora\\typora-user-images\\image-20220916184748103.png)]

4. mq如何保证消息不丢失(mq服务器默认情况下都会对mq消息进行持久化)

  1. producer

    确保投递到mq服务器的消息成功 :可以通过Ack 消息确认机制实现,同步或者异步

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-AzhUeEWL-1664445251314)(C:\\Users\\24473\\AppData\\Roaming\\Typora\\typora-user-images\\image-20220916184829321.png)]

  2. consumer

    消费成功之后发送通知到mq服务器,删除消息成功的消息,避免重复消费

    项目中通过这个方法进行通知的:

       //告诉服务器收到这条消息 已经被消费 可以在队列删掉 这样以后就不会再发了 否则消息服务器以为这条消息没处理掉 后续还会在发
       //消息的标识,false只确认当前一个消息收到,true确认所有consumer获得的消息
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    

    kafka消费成功是没有立即进行删除的。

    5. mq默认消费者均摊消费消息

    可以在创建的Channel中设置一次消费消息的数目。

    channel.basicQos(1); //表示一次性消费一条消息
    channel.basicQos(2); //表示一次性消费两条
    
    
    // 注意消费者消费消息成功之后必须通过Ack机制通知 已经消费成功和mq服务器删除消费的消息之后才会继续往下进行消费 
    channel.basicAck(envelope.getDeliveryTag(),false);
    
    
    

    6. RabbitMq交换机类型

    图片:

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Jpt2BK9W-1664445251315)(C:\\Users\\24473\\AppData\\Roaming\\Typora\\typora-user-images\\image-20220917115206295.png)]

    Direct exchange(直连交换机)

    Fanout exchange(扇型交换机)

    Topic exchange (主题交换机)

    Headers exchange(头交换机)

    /Virtual Hosts --区分 不同的团队

    ----队列 存放消息

    ---- 交换机 路由消息存放在哪个队列 相当于nginx

    ---- 路由key 分发规则

    6.1 fanout交换机(可以生产者投递同样的消息,不同队列存放相同消息,消费者消费相同消息。):

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-xljEStLo-1664445251316)(C:\\Users\\24473\\AppData\\Roaming\\Typora\\typora-user-images\\image-20220917115504322.png)]

    6.2 direct 交换机

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-04eShRWH-1664445251317)(C:\\Users\\24473\\AppData\\Roaming\\Typora\\typora-user-images\\image-20220917174608166.png)]

    6.3 topic 交换机(正则表达式进行匹配)正式工作很多中使用这种

    注意: #号表示支持匹配多个词,* 表示只可以匹配一个词

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ytN8v32a-1664445251318)(C:\\Users\\24473\\AppData\\Roaming\\Typora\\typora-user-images\\image-20220917175304288.png)]

    7. RabbitMq是如果进行路由的?

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-4lnKOodR-1664445251319)(C:\\Users\\24473\\AppData\\Roaming\\Typora\\typora-user-images\\image-20220917121627137.png)]

    RabbitMq是基于Amqp协议实现的分布式消息中间件,路由key(routing_key)是根据Exchange的type 和Binding的binding_key(建立queue和Exchange之间的绑定关系 )来决定的。

type类型为 fanout(扇形模式,广播模式):不会基于routing_key进行匹配,会发送到所以的队列中。

direct: 完整的匹配,routing_key 和binding_key完整相同,发送到相关的队列。

topic: 正则表达式进行匹配,如果routing_key 和binding_key符合正则匹配,会发送到符合的queue中。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-eoJivUkt-1664445251319)(C:\\Users\\24473\\AppData\\Roaming\\Typora\\typora-user-images\\image-20220917123039434.png)]

2.SpringBoot整合RabbitMq实现

1.引入依赖整合

  <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <version>2.2.6.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
            <version>2.4.5</version>
        </dependency>

    </dependencies>

2.写配置文件(可以测试三种交换机)

package com.mq.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @Author : JCccc
 * @CreateTime : 2019/9/3
 * @Description :
 **/
@Configuration
class DirectRabbitConfig 

    //队列 起名:TestDirectQueue
    @Bean
    public Queue TestDirectQueue() 
        // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
        // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
        // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
        //   return new Queue("TestDirectQueue",true,true,false);

        //一般设置一下队列的持久化就好,其余两个就是默认false
        return new Queue("TestDirectQueue",true);
    

    //Direct交换机 起名:TestDirectExchange
    @Bean
    DirectExchange TestDirectExchange() 
        //  return new DirectExchange("TestDirectExchange",true,true);
        return new DirectExchange("TestDirectExchange",true,false);
    

    //绑定  将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting
    @Bean
    Binding bindingDirect() 
        return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectRouting");
    

    @Bean
    DirectExchange lonelyDirectExchange() 
        return new DirectExchange("lonelyDirectExchange");
    
    @Bean("topicQueue")
    Queue topicQueu()
        return new Queue("topicQueue");
    

    @Bean("testTopicExchange")
    TopicExchange testTopicExchange()
       return new TopicExchange("testTopicExchange",true,false);
    
    @Bean
    Binding topicBinding(@Qualifier("topicQueue") Queue queue,@Qualifier("testTopicExchange") TopicExchange topicExchange)
        return BindingBuilder.bind(queue).to(topicExchange).with("topicKey");
    





3.写生产者code

package com.mq.producer;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

/**
 * @Author : JCccc
 * @CreateTime : 2019/9/3
 * @Description :
 **/
@RestController
public class SendMessageController 

    @Autowired
    RabbitTemplate rabbitTemplate;  //使用RabbitTemplate,这提供了接收/发送等等方法

    @GetMapping("/sendDirectMessage")
    public String sendDirectMessage() 
        String messageId = String.valueOf(UUID.randomUUID());
        String messageData = "test message, hello!";
        String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
        Map<String,Object> map=new HashMap<>();
        map.put("messageId",messageId);
        map.put("messageData",messageData);
        map.put("createTime",createTime);
        //将消息携带绑定键值:TestDirectRouting 发送到交换机TestDirectExchange
        rabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectRouting", map);
        return "ok";
    




4.写消费者code

package com.mq.config.consumer;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Map;

@Component
@RabbitListener(queues = "TestDirectQueue") //监听队列名称
public class DirectConsumer 
    //加上这个注解相当于消费成功进行回调
    @RabbitHandler
    public void process(Map testMessage)
        System.out.println("DirectReceiver接收到消息:" + testMessage.toString());
    



5.启动类文件

package com.mq;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
 class RabbitMqMain15672 
    public static void main(String[] args) 
        SpringApplication.run(RabbitMqMain15672.class,args);
    


6.配置文件 注意端口号需要为 5672不然报错 Socked closed

server:
  port:

spring:
  rabbitmq:
#    host: 192.168.80.88  #mq服务器ip,默认为localhost
    port: 5672          #mq服务器port,默认为5672
    username: test     #mq服务器username,默认为gust
    password: test     #mq服务器password,默认为guest


生产者如何获取消费结果

1.根据业务来定

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-zH07K2lq-1664445251320)(C:\\Users\\24473\\AppData\\Roaming\\Typora\\typora-user-images\\image-20220918151406509.png)]

订单号在生产者那边记录,如果消息成功之后插入数据库成功也会生成该订单号的一条数据,则可以进行删除这个消费成功的消息。

2.RocketMq 自带全局id,能够根据根据这个全局id获取消费结果。

原理: 生产者会发送消费给mq服务器,mq服务器生成一个全局id,消费者消费消息之后会通知mq服务器并发送标记消息消费成功。

死信队列

产生背景:俗称备胎队列,mq因为某种原因拒收该消息之后,可以转移到死信队列中存放,它也可以有交换机和路由key等。

产生原因:

  1. 消息投递到mq中存放消息已经过期 会自动的转移到 死信队列里面

  2. 消息队列达到最大长度

  3. 消费者多次消费消息失败,就会转移到死信队列中

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-71RkCfso-1664445251321)(C:\\Users\\24473\\AppData\\Roaming\\Typora\\typora-user-images\\image-20220918162612816.png)]

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-oHPOEwGm-1664445251321)(C:\\Users\\24473\\AppData\\Roaming\\Typora\\typora-user-images\\image-20220918164219496.png)]

应用场景

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-BAn9EDVq-1664445251322)(C:\\Users\\24473\\AppData\\Roaming\\Typora\\typora-user-images\\image-20220918164832373.png)]

消费者消费报异常引入mq无限的重试问题

默认情况下我们的消费者如果消费失败则mq会无限的进行重试,这样会导致消费者重复进行消费。我们应该人为的控制消费者重试的次数,所以会产生幂等性问题? 幂等性保证数据唯一

什么情况下消费者需要实现重试策略?

  1. 消费者获取消息后,用第三方接口失败?

    需要,这个可能是网络延迟的原因,重试可能会调用成功。

  2. 消费者获取消息之后代码出现错误?需要重试策略?

    不需要,可以把消息存入日志表中去,后期定期可以通过定时任务或者人工干预的方式。可以使用死信队列重新消费这些消息。

消费者手动的Ack签收这条消息已经消费成功

    //告诉服务器收到这条消息 已经被消费 可以在队列删掉 这样以后就不会再发了 否则消息服务器以为这条消息没处理掉 后续还会在发
            //消息的标识,false只确认当前一个消息收到,true确认所有consumer获得的消息
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

接口幂等性解决?

在业务层面使用全局唯一的id来进行约束,在并发情况下面是不靠谱的,防止多次请求导致的数据重复我们可以在 (insert)插入操作的话数据库中设置 唯一主键,更新操作可以通过 数据库的乐观锁机制实现

以上是关于RabbitMq学习笔记的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMQ学习(中)——交换机死信队列和延迟队列

在 RabbitMQ 中需要单独的死信交换吗?

RabbitMQ学习09--死信队列(TTL过期)

RabbitMQ学习--死信队列

RabbitMQ学习--死信队列

RabbitMQ 中的消息会过期吗?