如何处理消费过程中的重复消息?

Posted wang-yi-shan

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何处理消费过程中的重复消息?相关的知识,希望对你有一定的参考价值。

    上一节的内容最后,我们提出了一个思考。如果消息确认响应在网络传输过程中丢失了,那么消息队列的服务端、客户端会认为消息没有被正确传递,从而尝试进行下一次发送。这时候就会产生一些重复消息,那么应该怎么处理这个问题呢?

    一. 重复消息一定存在吗?

    在MQTT协议中,给出了三种消息传递过程中能够提供的服务质量的三种标准

  1. At most once:至多一次。消息传递时,最多会送达一次消息。也就是说不保证消息的可靠性传输,允许丢失消息的存在。一般都是对消息可靠性要求不太高的监控场景使用,比如饮料销售柜每分钟发送一次心跳。

  2. At least once:至少一次。消息传递时,最少会送达一次消息。也就是说保证了消息的可靠性传输,允许重复消息的存在。

  3. Exactly once:正好一次。消息传递时,只能有一次消息送达。既不允许丢失消息也不允许重复消息。

     

    消息队列也适用于这些标准,大多数的消息队列提供的服务器质量都是At least once,包括RabbitMQ、RocketMQ、Kafka。所以,在适用这些中间件时重复消息时一定会存在的。

    二. 如何消除重复消息的影响?

    一般处理重复消息的方法是,在消费端,让消费消息的操作具备幂等性。

    幂等是数学上的一个概念,它是这样定义的:f(x) = f(f(x))。通俗来讲就是,一个函数使用同样的参数,对它调用一次的结果和多次调用的结果是一样的。

    对消费端来说,如果消费操作是幂等操作,那么就算有重复消息传进来,它也会得到相同的结果,从而消除重复消息对系统的影响。即At least once + 幂等消费 = Exactly once。

    三. 实现幂等操作的方法

    1. 利用数据库唯一性约束实现幂等

        如果操作是在数据库insert一条和操作能关联起来的具有唯一性的数据,并根据是否插入成功判断这条消息是否被消费国,就可以实现幂等操作。

        这种方式是利用了数据库中的 insert unit 特性

    2. 在更新前增加前置判断条件

        如果能在数据库操作时添加一个前置条件,如果条件成立就更新,如果条件不成立就不更新,也能实现幂等操作。

        这种方法利用了数据库中的 update where 特性

    3. 记录并检查记录

        前面两种方法都有一定的情景局限性,还有一种通用性最强的方法:记录并检查记录 ,也叫做Token机制或者GUID机制。

        具体实现思路是,在发送消息时,给这条消息生成一个全局唯一ID,在消费时先去判断这个ID有没有被消费过,如果没有消费过才进行消费。

    ?    ?不过这种方式在分布式系统中,很难去实现。首先生成一个唯一的标识就是一件不那么简单的事,方法有很多,但是都不能同时满足简单、高性能、高可用的要求,或多或少会有一些牺牲。其次更麻烦的是,消费时检查、更新数据、更新消费状态这三个操作一定要是原子性的操作,才能保证真正的幂等。

?    ?    ?对于这个问题,我们可以用事务实现,也可以用锁实现,但是无论是分布式事务还是分布式锁都是一个比较难解决的问题。

以上是关于如何处理消费过程中的重复消息?的主要内容,如果未能解决你的问题,请参考以下文章

#yyds干货盘点# 如何处理消费过程中的重复消息?

系统学习消息队列分享(七) 如何处理消费过程中的重复消息?

如何处理消息队列消费过程中的重复消息&如何实现幂等性

RocketMQ 死信队列 | 消费者出现异常如何处理?

真实生产案例消息中间件如何处理消费失败的消息?

消息队列如何处理重复消息