浅谈RabbitMQ
Posted with the wind(随风)
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了浅谈RabbitMQ相关的知识,希望对你有一定的参考价值。
简单模拟MQ的作用 – 完成异步通讯
如图显示:有了RabbitMQ,你就可以和三个妹子同时进行短信聊天,就不需要为无法同时和三个妹子语音聊天而苦恼
MQ存在的意义 (解决同步通讯的存在的缺点)
如图所示:当用户完成了支付功能,我们需要仓储服务对应商品数量减1。当我们的支付服务频繁调用仓储服务之后,仓储服务挂掉了,这个时候支付服务想再调仓储服务就调不了,然后支付服务也阻塞在这里了,然后系统崩了。
同步存在的问题
初探MQ的作用
1. 服务解耦合
2. 性能提升,吞吐量提高
3. 服务没有强依赖,不担心级联失败问题
ps:概念啥的,谁想看啊
我们直接进入RabbitMQ的使用
安装RabbitMQ
PS:默认你们安装好了docker在自己的虚拟机或者自己的服务器上面了
docker pull rabbitmq:3-management(远程安装,版本可以自主选择)
--------------------------------------
也可以在自己电脑上面下载rabbitmq.tar,然后上传到你的虚拟机或者服务器,然后
docker load -i mq.tar
---------------------------
接下来,我们就可以使用docker运行我们的RabbitMQ了
docker run \\
-e RABBITMQ_DEFAULT_USER=xxx \\
-e RABBITMQ_DEFAULT_PASS=xxx\\
--name mq \\
--hostname mq1 \\
-p 15672:15672 \\ (注意:这是管理页面端口)
-p 5672:5672 \\ (注意:这是通讯端口)
-d \\ (后台运行)
rabbitmq:3-management
别直接复制上面的运行指令直接运行了,xxx,xxx这两个你自主设置一下,把括号和括号里面的内容删除一下
祝你好运
登录管理页面( ip地址:15672 )
如果你能登录进去说明你的安装配置都没有问题了,恭喜你。
整合RabbitMQ到项目里面
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
我们可以创建一个普通的maven项目为父项目,把依赖放在里面,再创建两个springboot子项目,分别作为消息的发布者和消息的消费者
ps:需要这个项目文件的可加qq:2338244917
然后我们在连个子项目中连接上我们的RabbitMQ
spring:
rabbitmq:
host: 47.98.251.192 # ip地址(你们懂的)
port: 5672 # 这个你就别填15672了,那个是登录管理面板使用的
username: xxx
password: xxx
virtual-host: / # 虚拟主机地址:可以在管理面板创建,不然就和我一样
// 我们可以在项目publisher的测试类里面测试和学习
@Autowired
private RabbitTemplate rabbitTemplate; //注入对象
@Test
public void test01() {
String queueName = "simple.queue";//队列名
String msg = "hello spring amqp!";//发送的信息
rabbitTemplate.convertAndSend(queueName,msg);
}
//我们在consumer项目里面创建一个listeber包,然后写一个类
//使用注解@RabbitListener,然后填出正确队列名,
//我们String msg的类型要和消息发布的类型一致
@RabbitListener(queues = "simple.queue")
public void listenSimpleMessage(String msg) {
System.out.println("消费者接收到simple.queue的消息:"+msg);
}
通过上面的操作,我们就可以让我们的消费者时刻消费发布者的消息了
上面的例子,我们就会出现一个问题。假设:我们有两个消费者(我们可以按照上面的多做一个@RabbitListener(queues=“xxx.xxx”))就行了,这个时候,我们的发布者发送50条信息,消费者我们搞连个来消费这些消息,一个1秒处理50条,一个1秒处理20条。
这样,这两个消费者处理一样数量的消息
原因是因为要一个处理完,下一个才能处理,这样有个乱用,我们要能者多劳
spring:
rabbitmq:
host: 47.98.251.199
username: root
password: 922815
port: 5672
virtual-host: /
listener:
simple:
prefetch: 1 # 每次取一条消息,处理完成才能获取下一条消息
在consumer项目配置一下就可以解决这个问题了
最后聊一聊交换机(Exchange)
交换机不储存消息
FanoutExchange
作用: 当我们消息的发送者发送消息成功之后,我们的FanoutExchange就会让绑定在它上面的队列queue全部都能消费到每一个消息
//consumer项目下
package cn.itcast.mq.cofig;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class FanoutConfig {
//声明FanoutExchange交换机
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("lin.fanout");
}
//声明第一个队列
@Bean
public Queue fanoutQueue1() {
return new Queue("fanout.queue1");
}
//绑定队列1和交换机
@Bean
public Binding bindingQueue1(Queue fanoutQueue1,FanoutExchange fanoutExchange) {
return BindingBuilder
.bind(fanoutQueue1)
.to(fanoutExchange);
}
//声明第二个队列
@Bean
public Queue fanoutQueue2() {
return new Queue("fanout.queue2");
}
//绑定队列2和交换机
@Bean
public Binding bindingQueue2(Queue fanoutQueue2,FanoutExchange fanoutExchange) {
return BindingBuilder
.bind(fanoutQueue2)
.to(fanoutExchange);
}
//...略,以相同方式声明第n个队列,并完成绑定
}
//publisher项目
@Test
public void testSendFanoutExchange() {
//交换机名称
String exchangeName = "lin.fanout";
//消息
String message = "Hello Everyone!";
//发送消息
rabbitTemplate.convertAndSend(exchangeName,"",message);
}
前面的代码是在consumer项目里面的,后面的代码是publisher项目的代码
DirectExchange
作用: 我们可以通过DirectExchange(路由交换机),通过发送信息时,绑定routingKey来指定我们发送给哪个消息队列消费
//consumer项目下
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue1"),
exchange = @Exchange(name = "lin.direct",type = ExchangeTypes.DIRECT),
key = {"red","blue"}
))
public void listenDirectQueue1(String msg) {
System.out.println("消费者接收到direct.queue1消息:["+msg+"]");
}
//publisher项目下
@Test
public void testSendDirectExchange1() {
//交换机名称
String exchangeName = "lin.direct";
//消息
String message = "Hello yellow!";
//发送消息
rabbitTemplate.convertAndSend(exchangeName,"yellow",message);
}
TopicExchange
作用: 这玩意的作用相当于把消息队列分类
#:代表0个或者1个单词,*:代表1个单词
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue1"),
exchange = @Exchange(name = "lin.topic",type = ExchangeTypes.TOPIC),
key = "china.#"
))
public void listenTopicQueue2(String msg) {
System.out.println("消费者接收到topic.queue1的消息:["+msg+"]");
}
//publisher项目
@Test
public void testSendTopicExchange1() {
//交换机名字
String exchangeName = "lin.topic";
//消息
String msg = "Hello,topicExchange!";
//发送
rabbitTemplate.convertAndSend(exchangeName,"china.news",msg);
}
ps:最后的最后渴望变成天使
大家别走,听我bb完最后一个知识点,b完我就下班了
SpringAMQP-消息转换器
它的作用就是,当我们消息的发布者发布一个消息是一个对象给rabbitMQ的时候,我们jdk有它一套方式,这方式不但性能不行,还不安全,我们要取代它
引入依赖
<!--jackson-databind-->
<!--我们在父项目依赖里面引入,这样我们学习的时候就能让子项目也能用了-->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
注入@Bean
这个@Bean我们可以在springboot项目入口那个类里面注入
当然在配置类里面注入也可以
//这玩意就是覆盖jdk编码格式的
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
//上面是为了避免你们导错包
@Bean
public MessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
我们简单来使用一下它
@RabbitListener(queues = "object.queue")
public void listenObjectQueue(Map<String,Object> msg) {
System.out.println("接收到object.queue的消息:"+msg);
}
@Test
public void testSendDirectExchange5() {
//队列名字
String queryName = "object.queue";
//消息
Map<String,Object> msg = new HashMap<>();
msg.put("name","柳岩");
msg.put("age",21);
rabbitTemplate.convertAndSend(queryName,msg);
}
好了,到此浅谈RabbitMQ就告一段落了
感谢能看到这里还不走的读者
要是还能给我点个赞就更加感谢了
如有问题,可以联系qq:2338244917
作者:随风
以上是关于浅谈RabbitMQ的主要内容,如果未能解决你的问题,请参考以下文章