Rabbitmq消息队列详解——SpringBoot整合

Posted wzq_55552

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Rabbitmq消息队列详解——SpringBoot整合相关的知识,希望对你有一定的参考价值。

SpringBoot整合

依赖:

<!-- 加入rabbitmq -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

配置:

spring:
  application:
    name: rabbitmq-springboot

  rabbitmq:
    host: 192.168.169.135
    port: 5672
    username: root
    password: 123456
    virtual-host: wzq.host #虚拟主机,最好名字是/开头,rabbitmq中先创建

配置会自动配置到RabbitTemplate,直接注入使用即可。

队列和交换机的创建是在消费者处创建的,如果没有消费者,队列和交换机不会创建。

//@Payload Object message消息是实体类的json数据

hello模型

没有交换机,直接发消息到队列

生产者:

    @Resource
    private RabbitTemplate rabbitTemplate;

    //发送到hello队列
    @Test
    public void hello()
        rabbitTemplate.convertAndSend("hello","hello world!");
    

消费者:

@Component
@RabbitListener(queuesToDeclare = @Queue(value = "hello"/**,durable = "true",autoDelete = "true"**/))
//消费者,监听hello队列,没有队列就创建,默认配置是持久化、非独占、不是自动删除
public class HelloCustomer 

    @RabbitHandler
    public void receice1(String message)
        System.out.println("message = " + message);
    


任务模型

没有交换机,多个消费者,轮询平均分配给消息者

生产者:

    @Resource
    private RabbitTemplate rabbitTemplate;

    //发送到work队列
    @Test
    public void work()
        for (int i = 0; i < 10; i++) 
            rabbitTemplate.convertAndSend("work",i+"work模型!");
        
    

消费者:

@Component
public class WorkCustomer 

    //多个消费者

    @RabbitListener(queuesToDeclare = @Queue("work"))
    public void receive1(String message)
        System.out.println("message1 = " + message);
    

    @RabbitListener(queuesToDeclare = @Queue("work"))
    public void receive2(String message)
        System.out.println("message2 = " + message);
    


fanout模型

交换机把所有消息都分配到每一个与它绑定的队列中,路由key没意义

生产者:

    @Resource
    private RabbitTemplate rabbitTemplate;

    //fanout广播模型
    @Test
    public void fanout()
        //交换机、路由key、内容
        rabbitTemplate.convertAndSend("logs","","fanout模型!");
    

消费者:

@Component
public class FanoutCustomer 

    //多个消费者

    @RabbitListener(bindings = 
            @QueueBinding(
                    //@Queue("名字")不写名字则创建一个临时队列,写就创建对应名字的队列
                    value = @Queue,
                    exchange = @Exchange(value = "logs", type = "fanout") //交换机名字跟消费者一样
            )
    )
    public void receive1(String message)
        System.out.println("message1 = " + message);
    

    @RabbitListener(bindings = 
            @QueueBinding(
                    //@Queue("名字")不写名字则创建一个临时队列
                    value = @Queue,
                    exchange = @Exchange(value = "logs", type = "fanout") //交换机名字跟消费者一样
            )
    )
    public void receive2(String message)
        System.out.println("message2 = " + message);
    


direct模型

路由key指定消息队列,队列key和消息key一致就接收消息

生产者:

    @Resource
    private RabbitTemplate rabbitTemplate;

    //direct路由模型
    @Test
    public void direct()
        //交换机、路由key、内容
        rabbitTemplate.convertAndSend("directs","info","direct模型info消息!");
    

消费者:

@Component
public class DirectCustomer 

    @RabbitListener(bindings = 
            @QueueBinding(
                    value = @Queue("direct"), 
                    exchange = @Exchange(value = "directs"),  //默认类型就是type
                    key = "info","error","warning"
            )
    )
    public void receive1(String message)
        System.out.println("message1 = " + message);
    

    @RabbitListener(bindings = 
            @QueueBinding(
                    value = @Queue, //临时队列
                    exchange = @Exchange(value = "directs"),  //默认类型就是type
                    key = "info"
            )
    )
    public void receive2(String message)
        System.out.println("message2 = " + message);
    


只接收对应key的消息

topic模型

*指匹配单个,#指匹配所有

生产者:

    @Resource
    private RabbitTemplate rabbitTemplate;

    //topic动态路由模型
    @Test
    public void topic()
        //交换机、路由key、内容
        rabbitTemplate.convertAndSend("topics","user.save","topic模型user.save消息!");
    

消费者:

@Component
public class TopicCustomer 

    @RabbitListener(bindings = 
            @QueueBinding(
                    value = @Queue,
                    exchange = @Exchange(value = "topics",type = "topic"),  //默认类型就是type
                    key = "user.save","user.*"
            )
    )
    public void receive1(String message)
        System.out.println("message1 = " + message);
    

    @RabbitListener(bindings = 
            @QueueBinding(
                    value = @Queue,
                    exchange = @Exchange(value = "topics",type = "topic"),  //默认类型就是type
                    key = "order.#","produce.#","user.*"
            )
    )
    public void receive2(String message)
        System.out.println("message2 = " + message);
    



接收到对应范围的消息

MQ的应用场景

异步处理

场景说明:用户注册后,需要发送注册邮件和注册短信,传统的做法有两种 1.串行方式 2.并行方式

  • 串行方式:将注册信息写入数据库后,再发送注册邮件,再发送注册短信。以上三个任务全部完成后才返回客户端。如果邮件、短信不是必须的话,它只是通知而已,那用户就等待了没必要等待的时间。

  • 并行方式:将注册信息写入数据库后,同时发送注册邮件和注册短信。以上三个任务全部完成后才返回客户端。

  • 消息队列:引入消息队列后,发送邮件和短信不是必须的业务逻辑,注册成功后就返回客户端,其他消息队列异步处理

应用解耦

场景:双十一,用户下单后,订单系统需要通知库存系统,传统的做法是订单系统调用库存系统的接口。

这种做法有一个缺点:

当库存系统出现故障时,订单就会失效。订单系统和库存系统高耦合,引入消息队列

  • 订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户下单成功。
  • 库存系统:订阅下单的消息,获取下单的消息,进行库存操作。就算库存系统出现故障,消息队列也能保证消息的可靠投递,不会导致消息丢失。

流量削峰

场景:秒杀活动,一般会因为流量大,导致应用挂掉,为了解决这个问题,一般在应用的前端加入消息队列。

作用:

  1. 可以控制活动人数,超过此一定阀值的订单直接丢弃
  2. 可以缓解短时间的高流量压垮应用

用户请求先写入消息队列,加入对了的长度超过最大值,则直接抛弃用户请求或跳转到错误页面;秒杀业务根据消息队列中的请求信息,在做后续的处理。

以上是关于Rabbitmq消息队列详解——SpringBoot整合的主要内容,如果未能解决你的问题,请参考以下文章

消息队列RabbitMQ入门与5种模式详解

重学“消息队列”,详解 RabbitMQ 消息确认机制!

重学“消息队列”,详解 RabbitMQ 消息确认机制!

RabbitMQ使用详解

Rabbitmq消息队列详解——SpringBoot整合

RabbitMQ的工作队列模式详解(下篇)