RabbitMQ高级特性
Posted 你是人间五月天
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ高级特性相关的知识,希望对你有一定的参考价值。
一、生产者消息投递的可靠性机制
rabbitmq 整个消息投递的路径为:
producer--->rabbitmq broker--->exchange--->queue--->consumer
1、confirm 确认模式
消息从 producer 到 exchange 则会返回一个 confirmCallback。
(1)设置publisher-confirms="true" 开启 确认模式
(2)使用rabbitTemplate.setConfirmCallback设置回调函数。当消息发送到exchange后回调confirm方法。在方法中判断ack,如果为true,则发送成功,如果为false,则发送失败,需要处理
2、return 退回模式
消息从 exchange-->queue 投递失败则会返回一个 returnCallback。
(1)设置publisher-returns="true" 开启 退回模式
(2)使用rabbitTemplate.setReturnCallback设置退回函数,当消息从exchange路由到queue失败后,如果设置了rabbitTemplate.setMandatory(true)参数,则会将消息退回给producer。并执行回调函数returnedMessage。
二、消费者确认机制
确认方式:
1、自动确认:acknowledge="none"
当消息一旦被Consumer接收到,则自动确认收到,并将相应 message 从 RabbitMQ 的消息缓存中移除。
2、手动确认:acknowledge="manual"
在业务处理成功后,调用channel.basicAck(),手动签收,如果出现异常,则调用channel.basicNack()方法,让其自动重新发送消息。
三、消费端限流处理
消费端确认模式设置为手动模式,prefetch属性可以指定一次拉取多少消息,例如:
prefetch=“10”,表示一次消费10 条消息,必须等待10条消息都手动签收后,才会拉取新的消息。
四、消息存活时间设置
1、对整个队列设置过期时间
设置x-message-ttl的值,单位毫秒
2、对某一条消息设置过期时间
设置expiration的值,单位毫秒。当该消息在被消费时,会判断是否过期。
如果两者都进行了设置,以时间短的为准。
五、死信队列
1、如何给正常队列绑定死信队列?
设置x-dead-letter-exchange:死信队列交换机
设置x-dead-letter-routing-key:死信队列路由key
2、消息成为死信的情况
(1)队列消息长度到达限制,新产生的消息成为死信
(2)消费者拒绝消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false;
(3)原队列存在消息过期设置,消息到达超时时间未被消费;
六、延迟队列
1、延迟队列 指消息进入队列后,可以被延迟一定时间,再进行消费。
2、RabbitMQ没有提供延迟队列功能,但是可以使用 : 过期时间 + 死信队列 来实现延迟队列效果
七、rabbitmq日志
RabbitMQ默认日志存放路径: /var/log/rabbitmq/rabbit@xxx.log
xxx表示主机名
八、消息补偿
为保证消息100%发送成功
1、通常会有补偿队列,然后在最后操作业务数据库的时候,通过业务的id去判断是否有重复数据
2、或者使用定时任务,定期去比较生产者数据库与消费者数据库的一致性。
九、消息幂等性解决
分两种情况:
1、新增操作
让每个消息携带一个全局的唯一ID,新增的时候先查询该ID对应的数据存不存在,不存在才操作
2、更新操作
类似乐观锁的操作,增加版本号字段,更新的时候where条件加上版本号。
十、集群搭建
RabbitMQ默认集群模式,但并不保证队列的高可用性,尽管交换机、绑定这些可以复制到集群里的任何一个节点,但是队列内容不会复制,复制队列内容到集群里的每个节点,必须要创建镜像队列。
此外再利用HAProxy去实现负载均衡。
RabbitMq高级特性之消费端限流 通俗易懂 超详细 内含案例
RabbitMq高级特性之消费端限流
一丶首先部署SpringBoot框架
- 完成 SpringBoot 整合 RabbitMq 中的Topic通配符模式
二丶在 resource资源文件夹里application.yml文件中 添加配置
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manual #开启手动签收
prefetch: 3 #一次就收三条
三、更改ProducerTest.java文件
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import javax.annotation.Resource;
@RunWith(SpringRunner.class)
@SpringBootTest
@Slf4j
public class producerTest {
@Resource
private RabbitTemplate rabbitTemplate;
@Test
public void test(){
String routingKey = "item.insert";
int count = 1;
while (count <= 9){
String message = "发送第"+count+"条消息";
//log.debug("路由键:{}",routingKey);
rabbitTemplate.convertAndSend(RabbitConfig.TOPIC_EXCHANGE_NAME,routingKey,message);
count++;
}
log.debug("发送成功");
}
}
四、更改CounmerListener.java文件
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Map;
/**
* 消费者 消费监听器
*/
@Component
@RabbitListener(queues = "direct_queue")
@Slf4j
public class ConsumerListener {
@RabbitHandler
public void accept(@Payload String message, @Headers Map map, Channel channel){
long deliveryTag = (long) map.get(AmqpHeaders.DELIVERY_TAG);
log.debug("deliveryTag:{}",deliveryTag);
log.debug("message:{}",message);
if (deliveryTag % 3 == 0) {
try {
//确认收到
channel.basicAck(deliveryTag,true);
Thread.sleep(3000);
log.debug("休息三秒然后在接受消息");
} catch (InterruptedException | IOException e) {
e.printStackTrace();
}
}
}
}
五、测试
先运行测试文件 创建交换机和队列
然后在运行消息监听器
本次内容运用到 RabbitMq确认消息机制
以上是关于RabbitMQ高级特性的主要内容,如果未能解决你的问题,请参考以下文章