RabbitMQ(简介概念安装和springboot整合)
Posted 李潘杜若
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ(简介概念安装和springboot整合)相关的知识,希望对你有一定的参考价值。
RabbitMQ(简介、概念、安装和springboot整合)
一、MQ简介
在计算机科学中,消息队列((英语:Message queue)是一种进程间通信或同一进程的不同线程间的通信方式,软件的贮列用来处理一系列的输入,通常是来自用户。消息队列提供了异步的通信协议,每一个贮列中的纪录包含详细说明的数据,包含发生的时间,输入设备的种类,以及特定的输入参数,也就是说:消息的发送者和接收者不需要同时与消息队列互交。消息会保存在队列中,直到接收者取回它。
1.1.实现
- 消息队列常常保存在链表结构中。拥有权限的进程可以向消息队列中写入或读取消息。
- 目前,有很多消息队列有很多开源的实现,包括JBoss Messaging.JORAM、Apache ActiveMQ、Sun0pen Message Queue、IBM MQ、Apache Qpid和HTTPSQS。
- 当前使用较多的消息队列有RabbitMQ、RocketNQ、ActiveMQ、Kafka、ZeroNQ、MetaMq等,而部分数据库如Redis、mysql以及phxsql也可实现消息队列的功能。
1.2.特点
MQ是消费者-生产者模型的一个典型的代表,一端往消息队列中不断写入消息,而另一端则可以读取或者订阅队列中的消息。MQ和JMS类似,但不同的是JMS是SUN JAVA消息中间件服务的一个标准和API定义,而MQ则是遵循了AMQP协议的具体实现和产品。
注意:
- AMQP,即DAdvanced MessageQueuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。
- JMS,Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件的API,
用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。常见的消息队列,大部分都实现了JMSAPI,如
ActiveMQ ,Redis以及 RabbitMQ等。
1.3.优缺点
优点
应用耦合、异步处理、流量削锋
-
解耦
传统模式:
传统模式的缺点:
系统间耦合性太强,如上图所示,系统A在代码中直接调用系统B和系统C的代码,如果将来D系统接入,系统A还需要修改代码,过于麻烦!
中间件模式:
中间件模式的的优点:
将消息写入消息队列,需要消息的系统自己从消息队列中订阅,从而系统A不需要做任何修改。
。异步
传统模式:
传统模式的缺点:
—些非必要的业务逻辑以同步的方式运行,太耗费时间。
中间件模式:
中间件模式的的优点:
将消息写入消息队列,需要消息的系统自己从消息队列中订阅,从而系统A不需要做任何修改。
-
削峰
传统模式:
传统模式的缺点:
并发量大的时候,所有的请求直接怼到数据库,造成数据库连接异常
中间件模式:
中间件模式的的优点:
系统A慢慢的按照数据库能处理的并发量,从消息队列中慢慢拉取消息。在生产中,这个短暂的高峰期积压是允许的。
缺点
系统可用性降低、系统复杂性增加
1.4.使用场景
消息队列,是分布式系统中重要的组件,其通用的使用场景可以简单地描述为:当不需要立即获得结果,但是并发量又需要进行控制的时候,差不多就是需要使用消息队列的时候
在项目中,将一些无需即时返回且耗时的操作提取出来,进行了异步处理,而这种异步处理的方式大大的节省了服务器的请求响应时间,从而提高了系统的吞吐量。
1.5.为什么使用RabbitMQ
AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。
AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。
RabbitMQ是一个开源的AMQP实现,服务器端用Erlangi语言编写,支持多种客户端,如: Python、
Ruby、 .NET,Java,JMS、C,php,ActionScript, XMPP,STONP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
总结如下:
- 基于AMQP协议
- 高并发(是一个容量的概念,服务器可以接受的最大任务数量)。
- 高性能(是一个速度的概念,单位时间内服务器可以处理的任务数)
- 高可用(是一个持久的概念,单位时间内服务器可以正常工作的时间比例)。
- 强大的社区支持,以及很多公司都在使用
- 支持插件
- 支持多语言
二、概念
-
RabbitMQ简介:RabbitMQ是一个由erlang开发的AMQP(Advanved Message Queue Protocol)的开源实现。
-
Message:消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。
-
Publisher:消息的生产者,也是一个向交换器发布消息的客户端应用程序。
-
Exchange:交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。Exchange有4种类型:direct(默认),fanout, topic, 和headers(headers和direct交换器完全一致,但性能差很多,目前几乎用不到),不同类型的Exchange转发消息的策略有所区别。
-
Queue:消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。
-
Binding:绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。Exchange 和Queue的绑定可以是多对多的关系。
-
Connection:网络连接,比如一个TCP连接。
-
Channel:信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内的虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条TCP 连接。
-
Consumer:消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。
-
Virtual Host:虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加
密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 / 。
-
Broker:表示消息队列服务器实体
三、安装
- 获取镜像
#指定版本,该版本包含了web控制页面
docker pull rabbitmq:management
- 运行镜像
#方式一:默认guest 用户,密码也是 guest
docker run -d --hostname my-rabbit --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq:management
#方式二:设置用户名和密码
docker run -d --hostname my-rabbit --name rabbit -e RABBITMQ_DEFAULT_USER=user -e RABBITMQ_DEFAULT_PASS=password -p 15672:15672 -p 5672:5672 rabbitmq:management
#方式三
docker run -d --name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 25672:25672 -p 15671:15671 -p 15672:15672 rabbitmq:management
# 4369, 25672 (Erlang发现&集群端口)
# 5672, 5671 (AMQP端口) 15672 (web管理后台端口)
# 61613, 61614 (STOMP协议端口)
# 1883, 8883 (MQTT协议端口)
# https://www.rabbitmq.com/networking.html
- 访问ui界面
http://localhost:15672/
四、SpringBoot整合RabbitMQ
4.1. 引入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
4.2. application.yml配置
spring:
rabbitmq:
host: 192.168.10.123
port: 5672
virtual-host: /
#开启发送端确认
#publisher-confirms: true
publisher-confirm-type: correlated
# 开启发送端消息抵达队列的确认
publisher-returns: true
template:
#只要抵达队列,以异步发送优先调用我们这个return - confirm
mandatory: true
listener:
simple:
#手动ack消息(手动确认消息是否消费)
acknowledge-mode: manual
4.3. RabbitConfig配置类
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
@Configuration
public class MyRabbitConfig
@Autowired
private RabbitTemplate rabbitTemplate;
@Primary
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory)
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
this.rabbitTemplate = rabbitTemplate;
rabbitTemplate.setMessageConverter(messageConverter());
initRabbitTemplate();
return rabbitTemplate;
/**
* 设置传输消息格式为json
* @return
*/
@Bean
public MessageConverter messageConverter()
return new Jackson2JsonMessageConverter();
/**
* 定制RabbitTemplate
* 1、服务收到消息就会回调
* 1、spring.rabbitmq.publisher-confirms: true
* 2、设置确认回调
* 2、消息正确抵达队列就会进行回调
* 1、spring.rabbitmq.publisher-returns: true
* spring.rabbitmq.template.mandatory: true
* 2、设置确认回调ReturnCallback
*
* 3、消费端确认(保证每个消息都被正确消费,此时才可以broker删除这个消息),即手动确认
* 消费端宕机,消息也不会丢失(channel.basicAck() )
*
*/
// @PostConstruct //MyRabbitConfig对象创建完成以后,执行这个方法
public void initRabbitTemplate()
/**
* 1、只要消息抵达Broker就ack=true
* correlationData:当前消息的唯一关联数据(这个是消息的唯一id)
* ack:消息是否成功收到
* cause:失败的原因
*/
/* rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback()
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause)
System.out.println("confirm...correlationData["+correlationData+"]==>ack:["+ack+"]==>cause:["+cause+"]");
);*/
//设置确认回调
rabbitTemplate.setConfirmCallback((correlationData,ack,cause) ->
System.out.println("confirm...correlationData["+correlationData+"]==>ack:["+ack+"]==>cause:["+cause+"]");
);
/**
* 只要消息没有投递给指定的队列,就触发这个失败回调
* message:投递失败的消息详细信息
* replyCode:回复的状态码
* replyText:回复的文本内容
* exchange:当时这个消息发给哪个交换机
* routingKey:当时这个消息用哪个路邮键
*/
rabbitTemplate.setReturnCallback((message,replyCode,replyText,exchange,routingKey) ->
System.out.println("Fail Message["+message+"]==>replyCode["+replyCode+"]" +
"==>replyText["+replyText+"]==>exchange["+exchange+"]==>routingKey["+routingKey+"]");
);
4.4. RabbitMQConfig(容器中创建交换机、队列和绑定)
package com.lyh.mall.order.config;
import com.lyh.mall.order.entity.OrderEntity;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.io.IOException;
import java.util.HashMap;
/**
* MQ交换机、普通队列、死信队列和绑定
* 容器中的Queue、Exchange、Binding 会自动创建(在RabbitMQ中不存在的情况下)
**/
@Configuration
public class MyRabbitMQConfig
/**
* 测试
*/
/*@RabbitListener(queues = "order.release.order.queue")
public void listener(OrderEntity orderEntity, Channel channel, Message message) throws IOException
System.out.println("收到过期的订单消息:准备关闭订单"+orderEntity.getOrderSn());
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
*/
/**
* 使用JSON序列化机制,进行消息转换
* @return
*/
@Bean
public MessageConverter messageConverter()
return new Jackson2JsonMessageConverter();
/* 容器中的Queue、Exchange、Binding 会自动创建(在RabbitMQ中不存在的情况下) */
/**
* 死信队列
*
* @return
*/
@Bean
public Queue orderDelayQueue()
/*
Queue(String name, 队列名字
boolean durable, 是否持久化
boolean exclusive, 是否排他
boolean autoDelete, 是否自动删除
Map<String, Object> arguments) 属性
*/
HashMap<String, Object> arguments = new HashMap<>();
//死信路由
arguments.put("x-dead-letter-exchange", "order-event-exchange");
//死信路由键
arguments.put("x-dead-letter-routing-key", "order.release.order");
arguments.put("x-message-ttl", 60000); // 消息过期时间 1分钟
//创建队列
Queue queue = new Queue("order.delay.queue", true, false, false, arguments);
return queue;
/**
* 普通队列
*
* @return
*/
@Bean
public Queue orderReleaseQueue()
Queue queue = new Queue("order.release.order.queue", true, false, false);
return queue;
/**
* TopicExchange
* 创建交换机
* @return
*/
@Bean
public Exchange orderEventExchange()
/*
* String name,
* boolean durable,
* boolean autoDelete,
* Map<String, Object> arguments
* */
return new TopicExchange("order-event-exchange", true, false);
/**
* 绑定死信队列
* @return
*/
@Bean
public Binding orderCreateBinding()
/*
* String destination, 目的地(队列名或者交换机名字)
* DestinationType destinationType, 目的地类型(Queue、Exhcange)
* String exchange,
* String routingKey,
* Map<String, Object> arguments
* */
return new Binding("order.delay.queue",
Binding.DestinationType.QUEUE,
"order-event-exchange",
"order.create.order",
null);
/**
* 绑定普通队列
* @return
*/
@Bean
public Binding orderReleaseBinding()
return new Binding("order.release.order.queue",
Binding.DestinationType.QUEUE,
"order-event-exchange",
"order.release.order",
null);
/**
* 订单释放直接和库存释放进行绑定
* @return
*/
@Bean
public Binding orderReleaseOtherBinding()
return new Binding("stock.release.stock.queue",
Binding.DestinationType.QUEUE,
"order-event-exchange",
"order.release.other.#",
null);
/**
* 商品秒杀队列
* @return
*/
@Bean
public Queue orderSecKillOrrderQueue()
Queue queue = new Queue("order.seckill.order.queue", true, false, false);
return queue;
@Bean
public Binding orderSecKillOrrderQueueBinding()
//String destination, DestinationType destinationType, String exchange, String routingKey,
// Map<String, Object> arguments
Binding binding = new Binding(
"order.seckill.order.queue",
Binding.DestinationType.QUEUE,
"order-event-exchange",
"order.seckill.order",
null);
return binding;
4.5. 测试代码
- AmqpAdmin:管理组件
- RabbitTemplate:消息发送处理组件
- @RabbitListener 监听消息的方法可以有三种参数(不分数量,顺序)Object content, Message message,Channel channel
import com.lyh.mall.order.entity.OrderReturnReasonEntity;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import java.util.Date;
@SpringBootTest
class MallOrderApplicationTests
@Autowired
private AmqpAdmin amqpAdmin;
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 发送消息
*/
@Test
void sendMessige()
OrderReturnReasonEntity reasonEntity = new OrderReturnReasonEntity();
reasonEntity.setId(1L);
reasonEntity.setCreateTime(new Date());
reasonEntity.setName("你好!");
rabbitTemplate.convertAndSend("hello-java-exchange","hello.java",reasonEntity);
System.out.println("发送消息成功!!");
/**
* 创建交换机
*/
@Test
void createExchange()
DirectExchange directExchange = new DirectExchange("hello-java-exchange",true,false);
amqpAdmin.declareExchange(directExchange);
System.out.println("创建成功");
/**
* 创建队列
*/
@Test
void createQueue()
Queue queue = new Queue("hello-java-queue",true,false,false);
amqpAdmin.declareQueue(queue);
System.out.println("创建成功");
/**
* 创建绑定
*/
@Test
void createBinding()
//String destination【目的地】,
// DestinationType destinationType【目的地类型】,
// String exchange【交换机】,
// String routingKey【路由键】,
//@Nullable Map<String, Object> arguments【自定义参数】
Binding binding = new Binding("hello-java-queue",Binding.DestinationType.QUEUE,"hello-java-exchange","hello.java",null);
amqpAdmin.declareBinding(binding);
System.out.println("创建成功");
//service层加监听注解获取 消息数据
/**
* 监听消息
* queues 声明需要监听的所有队列
* org.springframework.amqp.core.Message
* <p>
* 参数可以写一下类型
* 1、Message essage: 原生消息详细信息。头+体
* 2、发送的消息的类型: OrderReturnReasonEntity content;
* 3、Channel channel:当前传输数据的通道
* <p>
* Queue:可以很多人都来监听,只要收到消息,队列删除消息,而且只能有一个收到此消息
* 1)、订单服务启动多个:同一个消息,只能有一个客户端收到
* 2)、只有一个消息完全处理完,方法运行结束,我们就可以接收到下一个消息
*/
@RabbitListener(queues = "hello-java-queue")
//这个类的这个方法才能接受hello-java-queue消息
//@RabbitHandler //类上加注解@RabbitListener(queues = "hello-java-queue")
public void receiveMessage(Message message, OrderReturnReasonEntity content, Channel channel)
//拿到消息体
// byte[] body = message.getBody();
//拿到消息头
// MessageProperties properties = message.getMessageProperties();
System.out.println("接收到消息:" + content);
//消息处理完 手动确认 deliveryTag在Channel内按顺序自增
long deliveryTag = message.getMessageProperties().getDeliveryTag();
System.out.println("deliveryTag->" + deliveryTag);
try
if (deliveryTag % 2 == 0)
//确认签收 队列删除该消息 false非批量模式
channel.basicAck(deliveryTag, false);
else
//拒收退货 第三个参数 -> true:重新入队 false:丢弃
channel.basicNack(deliveryTag, false, true);
catch (IOException e)
//网络中断
// @RabbitHandler
//public void receiveMessage2(OrderEntity content)
// System.out.println("接收到消息:" + content);
//
以上是关于RabbitMQ(简介概念安装和springboot整合)的主要内容,如果未能解决你的问题,请参考以下文章
RabbitMQ从概念到使用从Docker安装到RabbitMQ整合Springboot1.5w字保姆级教学
RabbitMQ简介RabbitMQ 特点AMQP 概念生产者和消费者 Broker 服务节点Queue 队列Exchange 交换器如何保证消息的可靠性?