04 RabbitMQ进阶2之集群和延迟投递
Posted IT BOY
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了04 RabbitMQ进阶2之集群和延迟投递相关的知识,希望对你有一定的参考价值。
目录
Pt2 集群与高可用
RabbitMQ本身是基于Erlang编写,Erlang语言天生具备分布式特性(通过同步Erlang集群各节点的erlang.cookie来实现)。因此,RabbitMQ天然支持集群。集群是保证可靠性的一种方式,同时可以通过水平扩展以达到增加消息吞吐量能力的目的。
在高并发场景下,单台MQ服务器处理能力总会有瓶颈,通过集群增加MQ服务器节点,可以线型的提升MQ消息处理能力。 另外,节点故障往往不可避免,单节点一旦发生故障,将会导致服务不可用,集群可以有效的避免这个问题。集群中某一节点发生故障,客户端可以连接到其它MQ服务器,不会导致服务完全不可用。
RabbitMQ集群有两种节点类型:磁盘节点(Disc Node)和内存节点(RAM Node)。
-
磁盘节点:将元数据(VHost、Exchange名称/属性/类型、Queue名称/属性、绑定关系)存放在磁盘中。如果不指定类型,默认是磁盘节点。集群中至少需要一个磁盘节点来负责持久化数据,否则当集群宕机时,节点在内存中的数据将全部丢失,将会导致丢失全部元数据。
-
内存节点:将元数据存放在内存中。内存节点读写速度快,一般用来连接客户端程序,磁盘节点则用于备份数据和恢复数据。内存节点会将磁盘节点的磁盘上,用于故障时找到磁盘上的数据进行恢复。同时,如果是持久化消息,会同时存储在内存和磁盘节点上。
RabbitMQ有两种集群模式:普通集群模式和镜像队列模式。
Pt2.1 普通集群模式
普通集群模式下,不同节点之间只会互相同步元数据(VHost,Exchange,Queue,绑定关系),不会互相同步消息。
这种模式和Redis集群很相似,如果客户端连接到节点1上发送消息,但是消息要发送的Exchange和Queue是在节点3上,消息最终会被转发到节点3上存储。
普通集群模式的优势是可以线型的增加性能和存储容量,并且可以减少存储和同步消息数据的网络开销。但是缺点也很明显,就是每个节点上的消息存储实际上还是单机的,如果节点挂了将导致数据丢失。
如果你是对数据一致性没有很高的要求,能够容忍可能偶尔发生的宕机造成的数据丢失(有补偿方案、人工差错或者丢失无所谓了),可以使用普通集群模式,性能非常高。但是如果对数据一致性要求比较高的场景,比如支付,这种模式的可靠性是达不到要求的,所以要选择镜像队列模式。
Pt2.2 镜像队列模式
镜像队列模式非常容易理解,节点间除了元数据的同步外,还会同步消息,相当于每个节点存储的都是同样的数据,如同镜像一样。
镜像节点可用性非常高,任何节点宕机或数据丢失都不会造成数据不一致,因为在其他节点存储了数据副本。但是节点间消息同步带来了存储和性能损耗。
集群模式可以通过控台、命令行或者API进行设置,这里就不介绍了,比较简单,百度下就出来了。
Pt3 Spring AMQP使用
Pt3.1 Spring RabbitMQ
Spring AMQP 是Spring基于AMQP的消息收发处理的封装,直接看案例比较直观。
(1) 代码案例
基于Spring 对AMQP封装的RabbitMQ案例。
pom.xml引入依赖
<!-- RabbitMQ相关依赖 -->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.3.6</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.9.5</version>
</dependency>
applicationContext.xml配置RabbitMQ
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context https://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<!-- 包扫描根路径 -->
<context:component-scan base-package="com.example"/>
<!-- RabbitMQ配置 -->
<!-- AMQP连接工厂接口 -->
<rabbit:connection-factory id="connectionFactory" virtual-host="/" username="guest" password="guest"
host="121.4.33.15" port="5672"/>
<!-- 封装了RabbitMQ的基础管理操作,包括元数据的声明和删除等 -->
<rabbit:admin id="rabbitAdmin" connection-factory="connectionFactory"/>
<!-- 声明队列 -->
<rabbit:queue name="SPRING_RABBIT_QUEUE" durable="true" auto-delete="false" exclusive="false"
declared-by="rabbitAdmin"/>
<!-- 声明直连类型的EXCHANGE -->
<rabbit:direct-exchange name="SPRING_DIRECT_CHANGE" durable="true" auto-delete="false" declared-by="rabbitAdmin">
<rabbit:bindings>
<!-- EXCHANGE绑定QUEUE -->
<rabbit:binding queue="SPRING_RABBIT_QUEUE" key="spring"></rabbit:binding>
</rabbit:bindings>
</rabbit:direct-exchange>
<!-- 消息序列化处理 -->
<bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter"/>
<!-- RabbitTemplate是AMQPTemplate的实现,用来简化消息的收发操作 -->
<rabbit:template id="rabbitTemplate" exchange="SPRING_DIRECT_CHANGE" connection-factory="connectionFactory"
message-converter="jsonMessageConverter"/>
<bean id="messageReceiver" class="com.example.mq.rabbitmq.springfw.SpringConsumer" />
<rabbit:listener-container connection-factory="connectionFactory" message-converter="jsonMessageConverter">
<rabbit:listener ref="messageReceiver" queues="SPRING_RABBIT_QUEUE" />
</rabbit:listener-container>
</beans>
生产者代码
@Controller
public class SpringProducer
private Logger logger = LoggerFactory.getLogger(SpringProducer.class);
@Autowired
@Qualifier("rabbitTemplate")
private AmqpTemplate rabbitTemplate;
@RequestMapping(value = "/spring_rabbit_demo")
public void sendMessage(Object message)
// amqpTemplate 默认交换机 SPRING_DIRECT_CHANGE
rabbitTemplate.convertAndSend("FirstKey", "[Direct,FirstKey] " + message);
rabbitTemplate.convertAndSend("SecondKey", "[Direct,SecondKey] " + message);
消费者代码
public class SpringConsumer implements MessageListener
private Logger logger = LoggerFactory.getLogger(SpringConsumer.class);
public void onMessage(Message message)
logger.info("The first consumer received message : " + message.getBody());
(2) 核心对象
对象 | 描述 |
---|---|
ConnectionFactory | Spring AMQP 的连接工厂接口,用于创建连接。CachingConnectionFactory 是ConnectionFactory 的一个实现类。能指定Confirm Type和开启生产者确认模式和创建连接等 |
RabbitAdmin | RabbitAdmin 是 AmqpAdmin 的实现,封装了对 RabbitMQ 的基础管理操作,比如对交换机、队列、绑定的声明和删除等。 |
Message | Message 是 Spring AMQP 对消息的封装。 |
RabbitTemplate | Spring AMQP提供了一个发送和接收消息的操作模板类AmqpTemplate。 AmqpTemplate它定义包含了发送和接收消息等的一些基本的操作功能。RabbitTemplate是AmqpTemplate的一个实现。 |
MessageListener | MessageListener 是 Spring AMQP 异步消息投递的监听器接口,它只有一个方法 onMessage,作用类似于 Java API 中的 Consumer。 |
MessageListenerContainer | MessageListenerContainer可以理解为MessageListener的容器,一个Container只有一个 Listener,但是可以生成多个线程使用相同的 MessageListener 同时消费消息。 Container 可以管理 Listener 的生命周期,可以用于对于消费者进行配置。 |
MessageListenerContainerFactory | 可以在消费者上指定,当我们需要监听多个 RabbitMQ 的服务器的时候,指定不同的 MessageListenerContainerFactory。 |
MessageConvertor | 在使用RabbitTemplate的convertAndSend()方法发送消息时,会使用MessageConvertor进行消息的序列化。 AmqpTemplate 定义提供了各种发送和接收委拖给MessageConverter转化对象消息的方法。MessageConverter 本身比较简单,它提供了消息对象的转化,可将object转化成Message 对象,或者将Message 对象转化成Object对象。它提供了默认的SimpleMessageConverter实现,以及第三方的MessageConverter,如Jackson2JsonMessageConverter,MarshallingMessageConverter等,来处理消息与对象之间的转换。 |
Pt3.2 SpringBoot RabbitMQ
(1) 代码案例
pom.xml引入依赖包
<!-- SpringBoot AMQP依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
配置RabbitMQ
@Configuration
public class RabbitConfig
@Bean
public ConnectionFactory connectionFactory() throws Exception
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
cachingConnectionFactory.setUri("amqp://guest:guest@121.4.33.15:5673");
return cachingConnectionFactory;
@Bean
public RabbitAdmin amqpAdmin(ConnectionFactory connectionFactory)
RabbitAdmin admin = new RabbitAdmin(connectionFactory);
admin.setAutoStartup(true);
return admin;
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory)
return new RabbitTemplate(connectionFactory);
@Bean
public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory)
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setConsumerTagStrategy(new ConsumerTagStrategy()
public String createConsumerTag(String queue)
return null;
);
return container;
// 定义交换机
@Bean("springBootExchange")
public TopicExchange getSpringBootExchange()
return new TopicExchange("SPRING_BOOT_EXCHANGE");
// 定义队列
@Bean("springBootQueue")
public Queue getSpringBootQueue()
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-message-ttl", 6000);
Queue queue = new Queue("SPRING_BOOT_QUEUE", false, false, true, args);
return queue;
// 定义交换机和队列的绑定关系
@Bean
public Binding bindSecond(@Qualifier("springBootQueue") Queue queue, @Qualifier("springBootExchange") TopicExchange exchange)
return BindingBuilder.bind(queue).to(exchange).with("springboot");
定义生产者
@Controller
public class SpringBootProducer
@Autowired
RabbitTemplate rabbitTemplate;
@RequestMapping(value = "/basic")
public String send()
rabbitTemplate.convertAndSend("SPRING_BOOT_EXCHANGE", "springboot", "这是一条测试消息。");
System.out.println("消息发送成功");
return "PROCESS_OK";
定义消费者
@Component
@RabbitListener(queues = "SPRING_BOOT_QUEUE")
public class SpringBootConsumer
@RabbitHandler
public void consumer(String message) throws IOException
System.out.println("SpringBoot Consumer reveive message: " + message);
(2) 参数说明
SpringBoot中配置RabbitMQ参数如下。
-
base
-
spring.rabbitmq.host: 服务Host
-
spring.rabbitmq.port: 服务端口
-
spring.rabbitmq.username: 登陆用户名
-
spring.rabbitmq.password: 登陆密码
-
spring.rabbitmq.virtual-host: 连接到rabbitMQ的vhost
-
spring.rabbitmq.addresses: 指定client连接到的server的地址,多个以逗号分隔(优先取addresses,然后再取host)
-
spring.rabbitmq.requested-heartbeat: 指定心跳超时,单位秒,0为不指定;默认60s
-
spring.rabbitmq.publisher-confirms: 是否启用【发布确认】
-
spring.rabbitmq.publisher-returns: 是否启用【发布返回】
-
spring.rabbitmq.connection-timeout: 连接超时,单位毫秒,0表示无穷大,不超时
-
spring.rabbitmq.parsed-addresses:
-
-
ssl
-
spring.rabbitmq.ssl.enabled: 是否支持ssl
-
spring.rabbitmq.ssl.key-store: 指定持有SSL certificate的key store的路径
-
spring.rabbitmq.ssl.key-store-password: 指定访问key store的密码
-
spring.rabbitmq.ssl.trust-store: 指定持有SSL certificates的Trust store
-
spring.rabbitmq.ssl.trust-store-password: 指定访问trust store的密码
-
spring.rabbitmq.ssl.algorithm: ssl使用的算法,例如,TLSv1.1
-
-
cache
-
spring.rabbitmq.cache.channel.size: 缓存中保持的channel数量
-
spring.rabbitmq.cache.channel.checkout-timeout: 当缓存数量被设置时,从缓存中获取一个channel的超时时间,单位毫秒;如果为0,则总是创建一个新channel
-
spring.rabbitmq.cache.connection.size: 缓存的连接数,只有是CONNECTION模式时生效
-
spring.rabbitmq.cache.connection.mode: 连接工厂缓存模式:CHANNEL 和 CONNECTION
-
-
listener
-
spring.rabbitmq.listener.simple.auto-startup: 是否启动时自动启动容器
-
spring.rabbitmq.listener.simple.acknowledge-mode: 表示消息确认方式,其有三种配置方式,分别是none、manual和auto;默认auto
-
spring.rabbitmq.listener.simple.concurrency: 最小的消费者数量
-
spring.rabbitmq.listener.simple.max-concurrency: 最大的消费者数量
-
spring.rabbitmq.listener.simple.prefetch: 指定一个请求能处理多少个消息,如果有事务的话,必须大于等于transaction数量.
-
spring.rabbitmq.listener.simple.transaction-size: 指定一个事务处理的消息数量,最好是小于等于prefetch的数量.
-
spring.rabbitmq.listener.simple.default-requeue-rejected: 决定被拒绝的消息是否重新入队;默认是true(与参数acknowledge-mode有关系)
-
spring.rabbitmq.listener.simple.idle-event-interval: 多少长时间发布空闲容器时间,单位毫秒
-
spring.rabbitmq.listener.simple.retry.enabled: 监听重试是否可用
-
spring.rabbitmq.listener.simple.retry.max-attempts: 最大重试次数
-
spring.rabbitmq.listener.simple.retry.initial-interval: 第一次和第二次尝试发布或传递消息之间的间隔
-
spring.rabbitmq.listener.simple.retry.multiplier: 应用于上一重试间隔的乘数
-
spring.rabbitmq.listener.simple.retry.max-interval: 最大重试时间间隔
-
spring.rabbitmq.listener.simple.retry.stateless: 重试是有状态or无状态
-
-
template
-
spring.rabbitmq.template.mandatory: 启用强制信息;默认false
-
spring.rabbitmq.template.receive-timeout: receive() 操作的超时时间
-
spring.rabbitmq.template.reply-timeout: sendAndReceive() 操作的超时时间
-
spring.rabbitmq.template.retry.enabled: 发送重试是否可用
-
spring.rabbitmq.template.retry.max-attempts: 最大重试次数
-
spring.rabbitmq.template.retry.initial-interval: 第一次和第二次尝试发布或传递消息之间的间隔
-
spring.rabbitmq.template.retry.multiplier: 应用于上一重试间隔的乘数
-
spring.rabbitmq.template.retry.max-interval: 最大重试时间间隔
-
Pt4 使用规范
Pt4.1 资源管理
实际应用中,通常生产者和消费者可能不是一个应用,甚至不是一个团队,那这些资源应该谁来创建谁来管理呢?
基本原则就是,谁使用谁创建,谁污染谁治理。
生产者需要往Exchange发送消息,所以通常生产者负责创建Exchange。
消费者需要从Queue获取Exchange下发的消息,所以消费者负责创建Queue和绑定关系。
Pt4.2 命名规范
MQ元数据不要硬编码,尽可能放在配置中心,可以采用如下命名规范:
虚拟机:XXX_VHOST
交换机:XXX_EXCHANGE
队列:XXX_QUEUE
Pt4.3 消息持久化
发送给MQ的消息持久化到数据库中,并维护发送状态,可以实现消息的可追溯和防重控制,结合定时任务可以灵活的处理消息。
-
将消息持久化到数据库;
-
定时任务批量扫描未处理的消息,发送到MQ,更新发送状态;
-
如果消息处理失败,需要重发,则更新数据库中消息状态为未处理。
Pt4.4 连接数控制
消息处理创建的连接有不小的性能消耗,如果发送大批量的数据,可以进行消息合并处理,比如通过JSON数据类型,可以将很多消息合并到一起,减少生产者和消费者从MQ服务器处理消息时产生的连接数。
建议单条消息不要超过4M(4096KB)。
Pt5 延迟投递实现
Pt5.1 基于死信队列
场景:业务系统发起订单支付时,如果30分钟后没有收到支付结果,就自动发起关单处理。
分析:因为RabbitMQ本身不支持延迟发送消息的功能,所以要通过别的机制实现消息的延迟投递。最简单的是通过数据库处理,支付请求发送之后,同步入数据库,批处理每分钟扫描数据库中的订单,发现30分钟前的订单还没有支付时,就发起关单的MQ请求。这种方式比较简单,也能实现业务需求,但是通常支付订单非常多,每分钟扫描一次在大数据量的情况下,对数据库造成的压力也比较大,更有可能会影响其他业务操作。那能不能用RabbitMQ来实现呢。
当然可以,虽然RabbitMQ本身不支持延迟发送消息,但是可以曲线救国。我们知道,队列中的消息可以设置超时时间,超过时间没有消费就会过期,被丢弃或者被投入死信队列,那可以这样处理:
-
消息发送时指定30分钟的超时时间:有两种方式可以指定超时时间。一种是设置队列的过期时间,这个时间是对队列中所有消息生效的。另一种是设置单条消息的过期时间,这个设置只会对当前消息有效。如果同时指定了队列的过期时间和消息的过期时间,那么时间小的会生效。
-
指定队列的死信交换机DLX和死信队列DLQ:消息过期后,会被投递到死信队列,死信交换机和死信队列实际上也只是普通的交换机和队列。
-
消费死信队列的消息:消息被转发到DLQ之后,就可以配置消费者进行消费,从而达到延迟投递的效果。
案例如下。
// 生产者
public class DirectProducer
public static void main(String[] args)
// 创建消息信道
Connection connection = RabbitMQUtils.getConnection();
Channel channel = RabbitMQUtils.createChannel(connection);
try
// 创建死信交换机和死信队列。
channel.exchangeDeclare("DLX_PAY_EXCHANGE", "topic", false, false, false, null);
channel.queueDeclare("DLQ_PAY_QUEUE", false, false, false, null);
// 绑定DLX和DLQ,无条件转发。
channel.queueBind("DLQ_PAY_QUEUE", "DLX_PAY_EXCHANGE", "#");
System.out.println("死信队列和死信交换机创建成功。");
// 创建正常的交换机。
channel.exchangeDeclare("NORMAL_PAY_EXCHANGE", "topic", false, false, false, null);
// 创建正常的队列,并制定死信交换机。
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange", "DLX_PAY_EXCHANGE"); // 指定DLX
arguments.put("x-expires", 30000); // 30秒
channel.queueDeclare("NORMAL_PAY_QUEUE", true, false, false, arguments);
// 绑定交换机和队列
channel.queueBind("NORMAL_PAY_QUEUE", "NORMAL_PAY_EXCHANGE", "dopay");
System.out.println("普通队列和普通交换机创建成功。");
// 将消息信道设置为普通确认模式
channel.confirmSelect();
// 往正常交换机发送消息
channel.basicPublish("NORMAL_PAY_EXCHANGE", "dopay", MessageProperties.PERSISTENT_TEXT_PLAIN,
"发送支付请求".getBytes());
// 消息确认,但是因为NORMAL_PAY_QUEUE没有消费者,30秒之后消息会被投递到死信交换机。
if (channel.waitForConfirms())
System.out.println("消息发送成功,当前时间:" + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")).format(new Date()));
catch (Exception ex)
ex.printStackTrace();
// 消息发送异常,需要考虑重发
// 消费者
public class DirectConsumer1
public static void main(String[] args)
Connection connection = RabbitMQUtils.getConnection();
Channel channel = RabbitMQUtils.createChannel(connection);
try
// 创建消费者,绑定队列
Consumer consumer = new DefaultConsumer(channel)
@SneakyThrows @Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException
System.out.println(
"收到消息:" + new String(body, "UTF-8") + "时间:" + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"))
.format(new Date()));
;
while (true)
// 消费者直接消费死信队列的消息
channel.basicConsume("DLQ_PAY_QUEUE", true, consumer);
TimeUnit.SECONDS.sleep(1);
catch (Exception ex)
ex.printStackTrace();
但是上面这种方式,也有一些缺点:
-
如果统一用队列的TTL,当衰减梯队非常多的情况下,需要建立非常多的DLX和DLQ;
-
如果为每个消息设置单独过期时间,那么这个消息队列不可以和正常的混用,因为队列中消息等待过期会影响其他消息的投递,因为是FIFO,下一条消息要等待当前消息过期才能继续投递。
-
时间控制的不是很精准,存在时间误差。
当然,RabbitMQ也提供了更加简单和有效的办法,就是插件:rabbitmq_delayed_message_exchange插件。
Pt5.2 基于延迟插件
安装rabbitmq_delayed_message_exchange延迟消息插件,可以轻松实现延迟消息的处理。
插件的具体安装和使用,这里不做详细说明,网上案例很多。
Pt5.3 死信队列
通常在以下场景,消息会进入死信队列:
-
消息消费不及时,过期进入死信队列。
-
消息被消费者拒绝,并且未设置重回队列。
-
队列达到最大长度,超过了MaxLength或者MaxLengthBytes,最先入队的消息会被发送到死信队列。
Pt5.4 流量控制
如果MQ生产者的速度远远大于消费者速度,就会产生大量消息堆积,占用内存等系统资源,导致服务端处理性能下降,直至MQ崩溃。为了防止这种情况,需要对消息进行限流,限流可以基于服务端和客户端分别设置。
(1) 服务端限流
队列长度限制
通过限制队列长度控制消息数量。
-
x-max-length:队列中最大存储的消息数量,超过这个数据,队头的消息会被丢弃。
-
x-max-length-bytes:队列中存储的最大消息容量(单位bytes),超过这个容量,队头的消息会被丢弃。
通过限制队列长度,在消息有堆积的情况下有意义,不过会删除先入队的消息,跟传统限流的理解不一样。
内存限制
RabbitMQ会在启动时监测机器的物理内存数值。默认当MQ占用40%以上内存时,会主动抛出一个内存警告并阻塞所有连接,可以通过配置文件修改这个比例。
磁盘限制
磁盘限制和内存一样,当磁盘剩余可用空间低于指定数值时,会触发流控措施,默认50MB。可以通过参数配置修改数值。
(2) 消费端限流
在push模式下, 消息会被主动推送到消费者,如果消费者处理比较慢,消息就会在应用中大量堆积,最终打爆我们的应用程序。这是我们最不想看到的,所以除了服务端限流,还要进行客户端限流。
设置prefetch count,当超过这个数值的消息未被确认,RabbitMQ会停止投递新的消息给消费端,直到有消息被确认后继续投递。
本章到此结束,更多关于RabbitMQ的文章,请点击如下专栏连接。
以上是关于04 RabbitMQ进阶2之集群和延迟投递的主要内容,如果未能解决你的问题,请参考以下文章