springbootSpringBoot消息中间件RabbitMQ
Posted Brian Huang
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了springbootSpringBoot消息中间件RabbitMQ相关的知识,希望对你有一定的参考价值。
github地址:https://github.com/showkawa/springBoot_2017/tree/master/spb-demo/spb-brian-query-service
1.RabbitMQ简介
AMQP(高级消息队列协议)是一个异步消息传递所使用应用层协议规范,为面向消息中间件设计,基于此协议的客户端与消息中间件可以无视消息来源传递消息,不受客户端、消息中间件、不同的开发语言环境等条件的限制;
涉及概念解释:
Server(Broker):接收客户端连接,实现AMQP协议的消息队列和路由功能的进程;
Virtual Host:虚拟主机的概念,类似权限控制组,一个Virtual Host里可以有多个Exchange和Queue。
Exchange:交换机,接收生产者发送的消息,并根据Routing Key将消息路由到服务器中的队列Queue。(这个有点类似于nginx服务器的概念)
ExchangeType:交换机类型决定了路由消息行为,RabbitMQ中有三种类型Exchange,分别是fanout、direct、topic;
Message Queue:消息队列,用于存储还未被消费者消费的消息;
Message:由Header和body组成,Header是由生产者添加的各种属性的集合,包括Message是否被持久化、优先级是多少、由哪个Message Queue接收等;body是真正需要发送的数据内容;
BindingKey:绑定关键字,将一个特定的Exchange和一个特定的Queue绑定起来。
2.RabbitMQ运行机制
2.1 Exchange类型
生产者发送消息不会向传统方式直接将消息投递到队列中,而是先将消息投递到交换机中,在由交换机转发到具体的队列,队列在将消息以推送或者拉取方式给消费者进行消费,这点Nginx有点类似。
交换机的作用根据具体的路由策略分发到不同的队列中,交换机有四种类型。
Direct exchange(直连交换机)是根据消息携带的路由键(routing key)将消息投递给对应队列的
Fanout exchange(扇型交换机)将消息路由给绑定到它身上的所有队列
Topic exchange(主题交换机)队列通过路由键绑定到交换机上,然后,交换机根据消息里的路由值,将消息路由给一个或多个绑定队列
Headers exchange(头交换机)类似主题交换机,但是头交换机使用多个消息属性来代替路由键建立路由规则。通过判断消息头的值能否与指定的绑定相匹配来确立路由规则
Headers 匹配 AMQP 消息的 header 而不是路由键, headers 交换器和 direct 交换器完全一致,但性能差很多,目前几乎用不到了,所以直接看另外三种类型:
2.1.1 Direct Exchange
消息中的路由键(routing key)如果和 Binding 中的 binding key 一致, 交换器就将消息发到对应的队列中。
路由键与队列名完全匹配,如果一个队列绑定到交换机要求路由键为“dog”,则只转发 routing key 标记为“dog”的消息,
不会转发“dog.puppy”,也不会转发“dog.guard”等等。它是完全匹配、单播的模式。
2.1.2 Fanout Exchange
每个发到 fanout 类型交换器的消息都会分到所有绑定的队列上去。
fanout 交换器不处理路由键,只是简单的将队列绑定到交换器上,每个发送到交换器的消息都会被转发到与该交换器绑定的所有队列上。
很像子网广播,每台子网内的主机都获得了一份复制的消息。fanout 类型转发消息是最快的。
2.1.3 Topic Exchange
topic 交换器通过模式匹配分配消息的路由键属性,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。
它将路由键和绑定键的字符串切分成单词,这些单词之间用点隔开。它同样也会识别两个通配符:符号“#”和符号“*”。
#匹配0个或多个单词,*匹配一个单词。
3.RabbitMQ安装
基于docker的国内镜像安装(3-management带管理界面的rabbitmq),关于docker三分钟上手和常用指令这篇博客有汇总:https://www.cnblogs.com/hlkawa/p/9742015.html
> docker pull registry.docker-cn.com/library/rabbitmq:3-management
启动rabbitmq(-d 后台启动 -p 端口映射 5672 连接rabbirmq的端口 15672访问rabbitmq web管理界面的端口)
> docker run -d -p 5672:5672 -p 15672:15672 --name brian_rabbitmq xxxxxx(镜像name或者镜像ID)
rabbitmq的管理web访问url: ip:15672,默认的账户密码guest/guest
4.RabbitTemplate发送接受消息&序列化机制
4.1 引用依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
4.2 AmqpAdmin创建和删除 Queue, Exchange,Binding
ManageMQService.java
package com.kawa.mq; import com.kawa.config.Contents; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @Service public class ManageMQService { @Autowired AmqpAdmin amqpAdmin; public void createExchange(String exchangeName,String mqType){ if(mqType.equals(Contents.DIRECT_EXCHANGE)){ amqpAdmin.declareExchange(new DirectExchange(exchangeName)); } if(mqType.equals(Contents.FANOUT_EXCHANGE)){ amqpAdmin.declareExchange(new FanoutExchange(exchangeName)); } if(mqType.equals(Contents.TOPIC_EXCHANGE)){ amqpAdmin.declareExchange(new TopicExchange(exchangeName)); } } public void removeExchange(String exchangeName){ amqpAdmin.deleteExchange(exchangeName); } public void createQueue(String queueName){ amqpAdmin.declareQueue(new Queue(queueName,true)); } public void removeQueue(String queueName){ amqpAdmin.deleteQueue(queueName); } public void createBinding(String queueName, String exchangeName, String routingKey){ amqpAdmin.declareBinding(new Binding(queueName, Binding.DestinationType.QUEUE,exchangeName,routingKey,null)); } public void removeBinding(String queueName, String exchangeName, String routingKey){ amqpAdmin.removeBinding(new Binding(queueName, Binding.DestinationType.QUEUE,exchangeName,routingKey,null)); } }
4.3 使用RabbitTemplate发送消息
SendMessageService.java
package com.kawa.mq; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @Service public class SendMessageService { @Autowired RabbitTemplate rabbitTemplate; public void sendMessage(String exchange,String routingKey,Object obj){ //Message需要自己构造一个;定义消息体和消息头 //rabbitTemplate.send(exchange,routingKey,message); //object默认当成消息体,只需要传入发送对象,自动序列化发送给rabbitmq rabbitTemplate.convertAndSend(exchange,routingKey,obj); } }
4.4 测试创建 Queue, Exchange,Binding和发送消息
SpbDemoApplicationTests.java
@Test public void sendMessage() { manageMQService.createQueue("brian.test"); manageMQService.createExchange("brian",Contents.DIRECT_EXCHANGE); manageMQService.createBinding("brian.test","brian","mymq"); Brian brian = new Brian(); User user = new User(); user.setId((long) 12345678); user.setUsername("cassiel"); user.setPassword("#fyds"); List<String> list = new ArrayList<>(); list.add("我"); list.add("爱"); list.add("你"); list.add("中"); list.add("国"); Map<String,Object> map = new HashMap<>(); map.put("123","包邮"); brian.setKawadate(new Date()); brian.setLists(list); brian.setObj(map); brian.setUser(user); sendMessageService.sendMessage("brian","mymq",brian); }
查看结果
4.5 添加@RabbitListener监听和处理消息
在使用RabbitListener注解接收消息时,需要在启动类上加上数据@EnableRabbit
BrianService.java
package com.kawa.sercice; import com.kawa.pojo.Brian; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service; @Service public class BrianService { @RabbitListener(queues = "brian.test") public void receiveMessage(Brian brian){ System.out.println("接收到的消息体:" + brian); } }
4.6 启动工程查看测试结果
查看到队列里面消息已经没有了
以上是关于springbootSpringBoot消息中间件RabbitMQ的主要内容,如果未能解决你的问题,请参考以下文章