Rabbitmq消费者保证幂等性

Posted WontCode

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Rabbitmq消费者保证幂等性相关的知识,希望对你有一定的参考价值。

如图:

 示例代码:

pom文件添加依赖

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

application.yml配置rabbitmq连接

spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    virtual-host: /guoVirtualHost

    listener:
      simple:
        retry:
#          开启消费者(出现异常)会进行重试
          enabled: true
#          默认重试无限次,所以需指定重试次数
          max-attempts: 5
#          重试间隔时间
          initial-interval: 3000
#          保证消费者会消费消息,手动确认
        acknowledge-mode: manual

#定义交换机、路由、队列名称
guoguo:
  order:
    exchange: guoguo_order_exchange
    routingKey: guoguo.order
    queue: guoguo_order_queue

定义RabbitmqConfig配置类,Exchange,Routingkey,Queue

@Component
public class OrderConfig 
    
    //交换机名称 
    @Value("$guoguo.order.exchange")
    private String orderExchange;
    //路由key
    @Value("$guoguo.order.routingKey")
    private String orderRoutingKey;
    //队列名称
    @Value("$guoguo.order.queue")
    private String orderQueue;

    @Bean
    public DirectExchange orderExchange()
       return new DirectExchange(orderExchange);
    
    @Bean
    public Queue orderQueue()
        return new Queue(orderQueue,true,false,false);
    
    @Bean
    public Binding orderBinding(Queue orderQueue,DirectExchange orderExchange)
        //方式1:Queue orderQueue,DirectExchange orderExchange 会去ioc容器中找到Bean
        return BindingBuilder.bind(orderQueue).to(orderExchange).with(orderRoutingKey);
        //方式2:直接调用方法
   //return BindingBuilder.bind(orderQueue()).to( orderExchange()).with(orderRoutingKey);
    

定义发送消息工具类:

@Component
@Slf4j
public class SendMessageUtlis 

    @Autowired
    RabbitTemplate amqpTemplate;
    
    @Value("$guoguo.order.exchange")
    private String orderExchange;

    @Value("$guoguo.order.routingKey")
    private String orderRoutingKey;

    public void sendMessage(Order order)
        amqpTemplate.convertAndSend(orderExchange,orderRoutingKey,order,message -> 
            return message;
        );
    

发送消息:

  @RequestMapping("/addOrder")
    public String addOrder()
        String quanjuId = System.currentTimeMillis()+ "";
        //生产者投递消息时携带全局id发送到mq服务器端
        Order order = new Order(1, "mq演示", 66, "男",quanjuId);
        log.info("全局id:"+quanjuId);
        sendMessage.sendMessage(order );
        //返回全局id给客户端
        return quanjuId;
    

消费者监听消息:

    @Autowired
    OrderDao orderDao;

    @RabbitListener(queues = "guoguo_order_queue")
    public void process(Order order, Message message,Channel channel) throws IOException 
        if (StringUtils.isEmpty(order.getQuanjuId()))
            return;
        
        //拿到全局id
        Order orders = orderDao.getQuanjuId(order.getQuanjuId());
        if (order!= null)
            log.info("已经被消费");
            //如果不为null,则已经被消费过,也需要告诉mq服务器端,将该消息删除
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
            return;
        
        int result = orderDao.saveOrder(order);
        log.info("插入成功");
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    

以上是关于Rabbitmq消费者保证幂等性的主要内容,如果未能解决你的问题,请参考以下文章

rabbitmq 怎么保证幂等性,数据一致性问题

如何保证消息不被重复消费?(如何保证消息消费时的幂等性)

如何保证消息不被重复消费?或者说,如何保证消息消费的幂等性?

RabbitMQ消息中间件技术精讲10 高级篇三 幂等性保障不重复消费

如何保障消息中间件100%消息投递成功?如何保证消息幂等性?

213期如何保障消息中间件100%消息投递成功?如何保证消息幂等性?