消息队列中处理重复消息

Posted jfcat

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了消息队列中处理重复消息相关的知识,希望对你有一定的参考价值。

为什么消息队列中会出现消息重复现象

可能出现的场景

  • 业务层面的消息重复
    我这里有个场景,比如用户进行关注,在手机上点了一下,由于网络延迟或产品实现问题,没有马上变成已关注 or 取消关注,导致用户下意识的多点了几下。
  • 网络层面的消息重复
    这个不经常发送但是可能出现,比如生产端producer在发送消息的时候发生了网络抖动,过了一段时间后又重发了这条消息。但是服务器端真实的收到了两条消息并记录到队列中。

对于消息队列来说,只有在性能和重复性处理上进行取舍,由于重复不是一个频发的场景,消息队列为了实现高性能是允许重复消息的。三种传递消息时能够提供的服务质量标准:

  • At most once:最多一次,这种情况会消息只会被发送一次,可能丢失部分数据,一般日志收集这种对数据不严格的可以使用
  • At least once:最少一次,这种会导致一条消息重复发送,但是数据不会丢失
  • Exactly once:正好一次,一条消息只会被消费一次

RocketMQ,Rabbit MQ,Kafka都是使用的At least once,虽然消息会重复,但不会丢失。不使用Exactly once是因为每次发送前发送都要检查这条消息是否已成功发送,会大大降低了MQ的性能。

解决方案

一般都是在消费端保证幂等性来解决。

幂等:f(f(x))=f(x),执行多次和执行一次的结果是相同的,这种我们称之为幂等的。

首先,设置一个全局唯一标识,然后就可以通过一下几种办法来实现冥等操作。

  • 数据库唯一索引:
     通过在消息中带上唯一标识,比如订单ID,当用户多次发起支付时由于订单Id是唯一索引,插入多次会导致数据库报错从而避免重复执行
  • 通过cache保存唯一ID
    通过Redis的setnx添加一个key并设置过期时间,Redis中有这个key就不能重复操作
  • 数据库查询
     通过事务和唯一性处理,打开事务,然后查询有没有该订单号的流水,没有则可以插入。这样通过数据库的ACID特性来实现不重复的插入。

同时这里还有第一个场景的问题,比如用户在短时间内多次点击取消关注和关注,那么如果我们对消息进行多partition和多cosumer处理,可能导致消费顺序问题,这里就需要对消息保持顺序,方案上可以使用消息队列的队列和partition选择功能,这里以RocketMQ的官方示例说明地址

public class OrderedProducer 
    public static void main(String[] args) throws Exception 
        //Instantiate with a producer group name.
        MQProducer producer = new DefaultMQProducer("example_group_name");
        //Launch the instance.
        producer.start();
        String[] tags = new String[] "TagA", "TagB", "TagC", "TagD", "TagE";
        for (int i = 0; i < 100; i++) 
            int orderId = i % 10;
            //Create a message instance, specifying topic, tag and message body.
            Message msg = new Message("TopicTestjjj", tags[i % tags.length], "KEY" + i,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            SendResult sendResult = producer.send(msg, new MessageQueueSelector() 
            @Override
            public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) 
                Integer id = (Integer) arg;
                int index = id % mqs.size();
                return mqs.get(index);
            
            , orderId);

            System.out.printf("%s%n", sendResult);
        
        //server shutdown
        producer.shutdown();
    

这样同一类消息(比如同一个用户)可以找到同一个消息队列(partition),这样就可以保证消息的顺序

参考:
https://www.cnblogs.com/ITyannic/p/12241861.html
http://rocketmq.apache.org/docs/order-example/

以上是关于消息队列中处理重复消息的主要内容,如果未能解决你的问题,请参考以下文章

系统学习消息队列分享(七) 如何处理消费过程中的重复消息?

如何防止重复消息在 WebJob 处理时不插入到服务总线队列中?

消息队列漫谈:消息丢失,消息重复,消息积压一些处理方式

RabbitMQ 消息顺序消息幂等消息重复消息事务集群

消息队列消息丢失和消息重复发送的处理策略

消息队列聊一下如何避免消息的重复消费