SpringBoot如何整合RabbitMQ

Posted ShuSheng007

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SpringBoot如何整合RabbitMQ相关的知识,希望对你有一定的参考价值。

[版权申明] 非商业目的注明出处可自由转载
出自:shusheng007

文章目录

概述

好久没有写博客了,终日忙于生计,真是人过30不如狗啊,但写点什么好呢?想想当年自己入门时候那痛苦的经历,还是写点优质实用的入门文章吧,既满足了自己好为人师的本性,也能给后辈提供一些帮助。今天咱们就来聊聊springboot 整合rabbitmq的那些事吧。

SpringBoot的风头之盛不多说了,我是在2016年首次接触springboot的,当时自己要写个APP,后台采用了springboot。当时还真是个新鲜事物,没想到以后几年的发展犹如黄河泛滥,一发不可收拾…

SpringBoot整合rabbitmq很容易,但是整合的目的是为了使用,那要使用rabbitmq就要对其有一定的了解,不然容易整成一团浆糊。因为说到底,SpringBoot只是在封装rabbitmq的API,让其更容易使用而已,废话不多说,让我们一起整它。

rabbitmq简介

能看到这里就证明你对这个兔子有一定的了解了,如果真不知道rabbitmq官网走一趟,我这里就不啰嗦了。这里介绍的内容只是为了接下来整合使用做铺垫。

入门的话,大体理解了下图即可。

如图所示,producer产生一条消息,先丢给一个叫Exchange(交换器)的东西,然后交换器再将消息丢给Queue(队列),最后Consumer去队列获取消息。在我第一次听说rabbitmq的时候我脑子里面完全没有Exchange这玩意的概念,想当然的认为生产者将消息直接就丢给队列了,可见万事开头难。 Rabbit就是依靠Exchange把消息投递这个事玩出了花…

下面看一下关键的概念,不理解这些概念,在具体的使用过程中就会举步维艰

  • Producer 生产者
  • Consumer 消费者
  • Exchange 交换器

这个家伙又分4种,这里只看经常使用的两种即可,等你入了门,遇到时候再自己摸索吧。

Direct Exchange: 直接交换器,生产者将消息丢给它后,它从与自己绑定的那些queue里选择一个,然后直接将消息丢过去,其他的queue就接不到这个消息了。

Topic Exchange: 主题交换器,生产者将消息丢给它后,它就给与自己绑定的那些个关心这个消息queue全部发送一份消息。 就简单理解成发布-订阅模式就好了。

  • Queue 队列
  • RoutingKey路由key:用来控制交换器如何将消息发送给绑定的队列。例如交换器说了:俺们只投递路由key包含“shusheng007”字样的消息,其他的俺们不处理。 然后来了条消息,这条消息的路由key是“王二狗”,然后交换器一看不对啊,就残忍的拒绝了投递消息到队列的工作。

SpringBoot整合

SB整合其他技术一般就是3步,rabbit也不例外:

  • 引入依赖
 <dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-amqp</artifactId>
 </dependency>

有的同学要问了,为什么不写版本号<version>版本号</version>,这是因为版本号被springboot给管理了,这块有时间可以写一篇新的博客。

  • 配置

引入依赖后一般情况下都需要做点配置,给你刚引入的技术提供一些必要的信息它才能正常的运行起来。例如这里的rabbitmq,你至少要告诉它运行消息队列的服务器地址和端口吧,这样程序才能连接。

在我们的配置文件application.yml中加入如下配置

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest

如果引入的是starter,都可以在application.yml里配置,当然你也可以自己写个配置类,使用代码来配置,这个就比较麻烦但也比较高级了。

@Configuration
public class RabbitConfig 
    ...

  • 使用

第三步就是使用拉,本来各个技术使用方式各不相同,你要使用10种技术,你就要对这10种技术的API了然于胸,这对你幼小的心灵造成了不可磨灭伤害。此刻SB又站了出来,它使出洪荒之力为的是将各种技术的使用简化为统一的模式,于是各种各样的xxxTemplate就出来拉。

安装rabbitmq

  • 安装rabbitmq

你要使用rabbitmq你首先的有一个mq这没错吧,docker你该站出来拉…

docker run -d --hostname my-rabbit --name rabbit-mq -p 5672:5672 -p 15672:15672 rabbitmq:3-management

不会docker的,面壁3分钟…, 上面的参数有一个需要重点提一下

--hostname my-rabbit

使用这个参数显式指定消息实际存放的地方。

  • 查看后台

rabbitmq很贴心的为我们提供了一个web管理后台, 当安装成功后,访问http://localhost:15672,输入默认的用户名和密码guest。端口号15672是我们上面使用docker安装时指定的。


登录成功后就会进入其后台管理页面,如下图所示。至于里面是什么自己摸索吧。

初级用法

让我们由浅入深的来实践一下,编程这玩意其实是最无脑的,特别是做应用层码工的,一个技术规定是啥样就是啥样,而且是可验证的,没什么玄学。例如你问为什么rabbitmq的默认端口号是5672啊?有可能是作者手机号的后四位,有可能是女朋友的某几个三围…who知道,who关心?你不喜欢,换掉就好啦,它就是这么规定的,你自己写一个,爱使用哪个使用哪个…扯远了

如何消费消息

  • 开启rabbitmq

使用注解@EnableRabbit来声明开启,其可放在Application上,也可以放在你自己写的rabbit配置类上。

@EnableRabbit
@SpringBootApplication
public class RabbitmqIntegrateApplication 
	...

  • 监听消费消息

这个也比较简单了,使用@RabbitListener注解即可。

@Slf4j
@Service
public class QueueConsumer 
    @RabbitListener(queues = "ss007")
    public void receive(@Payload String fileBody) 
        log.info("ss007队列:" + fileBody);
    

上面的代码在监听ss007队列,只要这个队列里面存在消息,它就会消费。

经过上面两步就成功完成了一个rabbitmq消费消息的程序了,是不是很简单啊。现在你同事告诉你他往某个队列里面发消息,让你去写个消费程序,你可以自信的喊出那句:no problem!

值得注意的一点是,这样写要求ss007这个队列提前创建好了,不然会报错。我们打开rabbitmq的管理后台,按照下图红框展示那样添加一个队列。只填写名称ss007其他的都使用默认值即可,最后在绿框那边就会出现你创建的队列了,是不是特别简单。

然后点击新创建的队列名称,进入队列详情页。可以看到目前ss007这个队列有0个消费者。

让我们运行我们的程序后,刷新一下页面,可见已经有一个消费者了。

接下来了,让我们向队列里发送一个消息,点击Publish message选项展开,然后在payload里面填上消息内容,点击下面的publish即可。

看一下我们程序的输出:

xxxxxxxxxxxxxxx  INFO 99749 --- [ntContainer#0-1] t.s.rabbitmqintegrate.mq.QueueConsumer   : ss007队列:号外,号外,滴滴被罚80亿...

至此你已经成功完成了消息的消费开发验证。

如何发送消息

与消费消息相比,发送消息相对来说比较复杂一点,假设我们向刚才已经建立的队列ss007丢消息要怎么做呢?

第一步先创建一个队列对象myQuesue。

@Configuration
public class RabbitConfig 
    @Bean
    public Queue myQueue()
        return new Queue("ss007",true);
    

第二步,使用RabbitTemplate向myQueue队列里面发送消息

@RequiredArgsConstructor
@Service
public class SendService 

   private final RabbitTemplate rabbitTemplate;
   private final Queue myQueue;

   public void sendMsg(String msg)
      rabbitTemplate.convertAndSend(myQueue.getName(),msg);
   

我们来写一个controller,然后使用postman发触发一下消息发送。

@RequiredArgsConstructor
@RestController
@RequestMapping("/trigger")
public class TriggerController 
    private final SendService sendService;
    @GetMapping("/send")
    public String sendMsgToMq(@RequestParam String msg)
        sendService.sendMsg(msg);
        return "ok";
    

程序输出如下:

xxxxxxxxxxxxxxxx  INFO 13185 --- [ntContainer#0-1] t.s.rabbitmqintegrate.mq.QueueConsumer   : ss007队列:hello world

可见,我们刚向ss007队列发送了一条消息,这条消息就被我们刚才写的消费者给消费了。

至此,发送消息和消费消息都演示完了,请点赞,收藏…

what?就这?说好的Exchange呢?说好的Routing key呢?让你给偷摸造拉?好吧,好吧…现在的小朋友已经很难糊弄了,不见干货不点赞啊…

前面我们介绍Rabbitmq的时候说每个队列需要绑定到一个交换器上才能正常接收消息,但是我们好像没有定义交换器,也没有绑定任何交换器,这是怎么回事呢?

其实是因为,rabbitmq有一个默认的Exchange,而每个队列都会默认绑定它。所以我们前面的演示使用的是默认Exchange,它是一个direct类型的交换器,如下图所示。


那routing key我们也没指定啊,默认Exchange使用哪个路由key呢? 默认队列名称作为路由key,也就是ss007

高级用法

上面那些呢写写demo是没问题的,但在工程实践中一般还是推荐显式指定Exchange以及RoutingKey的。我们以topic类型的Exchange来看下。

配置交换器与队列

@Configuration
public class RabbitConfig 

    @Bean
    public Queue topicQueue1()
        return new Queue("topicQueue1",true);
    

    @Bean
    public Queue topicQueue2()
        return new Queue("topicQueue2",true);
    

    @Bean
    public TopicExchange topicExchange()
        return new TopicExchange("topicExchange");
    

    @Bean
    public Binding topicBinding1()
        return BindingBuilder.bind(topicQueue1()).to(topicExchange())
                .with("ss007.id.*");
    

    @Bean
    public Binding topicBinding2()
        return BindingBuilder.bind(topicQueue2()).to(topicExchange())
                .with("ss007.name.*");
    

上面的代码创建了一个Exchange,两个队列,并将这两个队列绑定到了那个Exchange上。注意两个绑定使用的routingkey是不一样的。

发送消息

   public void sendTopicMsg(String msg,String route)
      rabbitTemplate.convertAndSend("topicExchange",route,msg);
   

第一个参数是我们上一步配置的交换器,第二个参数是routingkey,第三个参数是消息

消费消息

    @RabbitListener(queues = "topicQueue1")
    public void receiveTopic1(@Payload String fileBody) 
        log.info("topic1队列:" + fileBody);
    

    @RabbitListener(queues = "topicQueue2")
    public void receiveTopic2(@Payload String fileBody) 
        log.info("topic2队列:" + fileBody);
    

监听并消费topicQueue1与topicQueue2队列的消息。

经过以上三步我们就完成了发送和消费消息的流程,启动程序后,让我们看一下rabbit的后台,赫然出现了我们创建的这些信息。从图中看到topicExchange已经生成,类型是Topic,而且生成了两个队列topicQueue1与topicQueue2,这个Exchange绑定了两个队列

测试

让我们实际测试一下:

使用postman发起get请求,使用路由字段为ss007.牛翠花.觉醒

localhost:8080/trigger/sendTopic?msg=牛翠花-总有刁民想害朕&route=ss007.牛翠花.觉醒

查看程序输出:

topic2队列:牛翠花-总有刁民想害朕

我们向交换器发送了一条消息,路由key是ss007.牛翠花.觉醒,它匹配到了我们topicBinding2的路由key:ss007.*.觉醒,而没有匹配到topicBinding1的路由key:ss007.王二狗.觉醒,所以只有topicQueue2里丢入了消息。

使用postman再发起一个get请求,这次使用路由字段为ss007.王二狗.觉醒

localhost:8080/trigger/sendTopic?msg=王二狗-王侯将相宁有种乎&route=ss007.王二狗.觉醒

输出:

topic2队列:王二狗-王侯将相宁有种乎
topic1队列:王二狗-王侯将相宁有种乎

可见,两个队列里面都被丢入了同样的消息。这是为什么呢?这是由于我们使用的是Topic类型的交换器,而且路由key可以匹配到两个队列的绑定。

总结

至此,你以为本文真的要结束了?

嗯…就让我们结束它吧,都这么长了。留下一个使用SpringCloud-Stream来使用rabbitmq的话题下次再讲,它将抽象又提升了一个等级,我觉得这哥们是未来…请持续关注

下面是本文源码,记得给个小星星哦

GitHub源码下载

以上是关于SpringBoot如何整合RabbitMQ的主要内容,如果未能解决你的问题,请参考以下文章

Springboot如何整合RabbitMq,若不会,我便手把手教你超级详细

SpringBoot整合RabbitMQ--重试/消息序列化--方法/实例

SpringBoot2.0应用:SpringBoot2.0整合RabbitMQ

SpringBoot系列5SpringBoot整合RabbitMQ

RabbitMQ整合SpringBoot

RabbitMQ整合SpringBoot