RabbitMQ高级特性

Posted 你是人间五月天

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ高级特性相关的知识,希望对你有一定的参考价值。

一、生产者消息投递的可靠性机制

rabbitmq 整个消息投递的路径为:

producer--->rabbitmq broker--->exchange--->queue--->consumer

1、confirm 确认模式

消息从 producer 到 exchange 则会返回一个 confirmCallback。

(1)设置publisher-confirms="true" 开启 确认模式

(2)使用rabbitTemplate.setConfirmCallback设置回调函数。当消息发送到exchange后回调confirm方法。在方法中判断ack,如果为true,则发送成功,如果为false,则发送失败,需要处理

2、return  退回模式

消息从 exchange-->queue 投递失败则会返回一个 returnCallback。

(1)设置publisher-returns="true" 开启 退回模式

(2)使用rabbitTemplate.setReturnCallback设置退回函数,当消息从exchange路由到queue失败后,如果设置了rabbitTemplate.setMandatory(true)参数,则会将消息退回给producer。并执行回调函数returnedMessage。

二、消费者确认机制

确认方式:

1、自动确认:acknowledge="none"

当消息一旦被Consumer接收到,则自动确认收到,并将相应 message 从 RabbitMQ 的消息缓存中移除。

2、手动确认:acknowledge="manual"

在业务处理成功后,调用channel.basicAck(),手动签收,如果出现异常,则调用channel.basicNack()方法,让其自动重新发送消息。

三、消费端限流处理

消费端确认模式设置为手动模式,prefetch属性可以指定一次拉取多少消息,例如:

prefetch=“10”,表示一次消费10 条消息,必须等待10条消息都手动签收后,才会拉取新的消息。

四、消息存活时间设置

1、对整个队列设置过期时间

设置x-message-ttl的值,单位毫秒

2、对某一条消息设置过期时间

设置expiration的值,单位毫秒。当该消息在被消费时,会判断是否过期。

如果两者都进行了设置,以时间短的为准。

五、死信队列

1、如何给正常队列绑定死信队列?

设置x-dead-letter-exchange:死信队列交换机

设置x-dead-letter-routing-key:死信队列路由key

2、消息成为死信的情况

(1)队列消息长度到达限制,新产生的消息成为死信

(2)消费者拒绝消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false

(3)原队列存在消息过期设置,消息到达超时时间未被消费;

六、延迟队列

1、延迟队列 指消息进入队列后,可以被延迟一定时间,再进行消费。

2、RabbitMQ没有提供延迟队列功能,但是可以使用 : 过期时间 + 死信队列 来实现延迟队列效果

七、rabbitmq日志

RabbitMQ默认日志存放路径: /var/log/rabbitmq/rabbit@xxx.log

xxx表示主机名

八、消息补偿

为保证消息100%发送成功

1、通常会有补偿队列,然后在最后操作业务数据库的时候,通过业务的id去判断是否有重复数据

2、或者使用定时任务,定期去比较生产者数据库与消费者数据库的一致性。

九、消息幂等性解决

分两种情况:

1、新增操作

让每个消息携带一个全局的唯一ID,新增的时候先查询该ID对应的数据存不存在,不存在才操作

2、更新操作

类似乐观锁的操作,增加版本号字段,更新的时候where条件加上版本号。

十、集群搭建

RabbitMQ默认集群模式,但并不保证队列的高可用性,尽管交换机、绑定这些可以复制到集群里的任何一个节点,但是队列内容不会复制,复制队列内容到集群里的每个节点,必须要创建镜像队列。

 此外再利用HAProxy去实现负载均衡。

RabbitMq高级特性之消费端限流 通俗易懂 超详细 内含案例

RabbitMq高级特性之消费端限流

一丶首先部署SpringBoot框架

  1. 完成 SpringBoot 整合 RabbitMq 中的Topic通配符模式

二丶在 resource资源文件夹里application.yml文件中 添加配置

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: manual #开启手动签收
        prefetch: 3 #一次就收三条

三、更改ProducerTest.java文件

import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import javax.annotation.Resource;

@RunWith(SpringRunner.class)
@SpringBootTest
@Slf4j
public class producerTest {

    @Resource
    private RabbitTemplate rabbitTemplate;

    @Test
    public void test(){
    
        String routingKey = "item.insert";
    
        int count = 1;
        while (count <= 9){
            String message = "发送第"+count+"条消息";
            //log.debug("路由键:{}",routingKey);
            rabbitTemplate.convertAndSend(RabbitConfig.TOPIC_EXCHANGE_NAME,routingKey,message);
            count++;
        }
        log.debug("发送成功");
    }
}

四、更改CounmerListener.java文件

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.Map;

/**
 * 消费者 消费监听器
 */
@Component
@RabbitListener(queues = "direct_queue")
@Slf4j
public class ConsumerListener {

    @RabbitHandler
    public void accept(@Payload String message, @Headers Map map, Channel channel){

        long deliveryTag = (long) map.get(AmqpHeaders.DELIVERY_TAG);
        log.debug("deliveryTag:{}",deliveryTag);
        log.debug("message:{}",message);
        if (deliveryTag % 3 == 0) {
            try {
                //确认收到
                channel.basicAck(deliveryTag,true);
                Thread.sleep(3000);
                log.debug("休息三秒然后在接受消息");
            } catch (InterruptedException | IOException e) {
                e.printStackTrace();
            }
        }
    }

}

五、测试

先运行测试文件 创建交换机和队列

然后在运行消息监听器

本次内容运用到 RabbitMq确认消息机制

以上是关于RabbitMQ高级特性的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMq高级特性之延迟队列 通俗易懂 超详细 内含案例

RabbitMQ RabbitMQ高级特性

RabbitMQ RabbitMQ高级特性

消息队列 RabbitMq高级特性

Windows 安装 RabbitMQ

RabbitMQ高级特性