(7)消息的确认和重发机制

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了(7)消息的确认和重发机制相关的知识,希望对你有一定的参考价值。

参考技术A 还记得我们在分析消费端的源码的时候,所讲到的 prefetchsize 吗?
可以参考上一篇的博客: https://www.jianshu.com/p/f5016f845d16

可以参考官网: http://activemq.apache.org/what-is-the-prefetch-limit-for.html

activemq 的 consumer 端也有窗口机制,通过 prefetchSize 就可以设置窗口大小。不同的类型的队列,prefetchSize 的默认值也是不一样的

通过上面的例子,我们基本上应该知道 prefetchSize 的作用了,消费端会根据prefetchSize 的大小批量获取数据,比如默认值是 1000,那么消费端会预先加载 1000 条数据到本地的内存中。

既然有批量加载,那么一定有批量确认,这样才算是彻底的优化

ActiveMQ 提供了 optimizeAcknowledge 来优化确认,它表示是否开启“优化ACK”,只有在为 true 的情况下,prefetchSize 以及optimizeAcknowledgeTimeout 参数才会有意义,优化确认一方面可以减轻 client 负担(不需要频繁的确认消息)、减少通信开销,另一方面由于延迟了确认(默认 ack 了 0.65*prefetchSize 个消息才确认),broker 再次发送消息时又可以批量发送
如果只是开启了 prefetchSize,每条消息都去确认的话,broker 在收到确认后也只是发送一条消息,并不是批量发布,当然也可以通过设置 DUPS_OK_ACK来手动延迟确认, 我们需要在 brokerUrl 指定 optimizeACK 选项

注意,如果 optimizeAcknowledge 为 true,那么 prefetchSize 必须大于 0. 当 prefetchSize=0 的时候,表示 consumer 通过 PULL 方式从 broker 获取消息

到目前为止,我们知道了 optimizeAcknowledge 和 prefetchSize 的作用,两者协同工作,通过批量获取消息、并延迟批量确认,来达到一个高效的消息消费模型。它比仅减少了客户端在获取消息时的阻塞次数,还能减少每次获取消息时的网络通信开销

需要注意的是,如果消费端的消费速度比较高,通过这两者组合是能大大提升 consumer 的性能。如果 consumer 的消费性能本身就比较慢,设置比较大的 prefetchSize 反而不能有效的达到提升消费性能的目的。因为过大的prefetchSize 不利于 consumer 端消息的负载均衡。因为通常情况下,我们都会部署多个 consumer 节点来提升消费端的消费性能。这个优化方案还会存在另外一个潜在风险,当消息被消费之后还没有来得及确认时,client 端发生故障,那么这些消息就有可能会被重新发送给其它consumer,那么这种风险就需要 client 端能够容忍“重复”消息

通过前面的源码分析,基本上已经知道了消息的消费过程,以及消息的批量获取和批量确认,那么接下来再了解下消息的确认过程
消息确认有四种 ACK_MODE,分别是:
AUTO_ACKNOWLEDGE = 1 自动确认
CLIENT_ACKNOWLEDGE = 2 客户端手动确认
DUPS_OK_ACKNOWLEDGE = 3 自动批量确认
SESSION_TRANSACTED = 0 事务提交并确认

虽然 Client 端指定了 ACK 模式,但是在 Client 与 broker 在交换 ACK 指令的时候,还需要告知 ACK_TYPE,ACK_TYPE 表示此确认指令的类型,不同的ACK_TYPE 将传递着消息的状态,broker 可以根据不同的 ACK_TYPE 对消息进行不同的操作

DELIVERED_ACK_TYPE = 0 消息"已接收",但尚未处理结束
STANDARD_ACK_TYPE = 2 "标准"类型,通常表示为消息"处理成功",broker 端可以删除消息了
POSION_ACK_TYPE = 1 消息"错误",通常表示"抛弃"此消息,比如消息重发多次后,都无法正确处理时,消息将会被删除或者 DLQ(死信队列)
REDELIVERED_ACK_TYPE = 3 消息需"重发",比如 consumer 处理消息时抛出了异常,broker 稍后会重新发送此消息
INDIVIDUAL_ACK_TYPE = 4 表示只确认"单条消息",无论在任何 ACK_MODE 下
UNMATCHED_ACK_TYPE = 5 在 Topic 中,如果一条消息在转发给“订阅者”时,发现此消息不符合 Selector 过滤条件,那么此消息将 不会转发给订阅者,消息将会被存储引擎删除(相当于在 Broker 上确认了消息)。

Client 端在不同的 ACK 模式时,将意味着在不同的时机发送 ACK 指令,每个 ACK Command 中会包含 ACK_TYPE,那么 broker 端就可以根据 ACK_TYPE 来决定此消息的后续操作

在正常情况下,有几中情况会导致消息重新发送

1.在事务性会话中,没有调用 session.commit 确认消息或者调用session.rollback 方法回滚消息
2.在非事务性会话中,ACK 模式为 CLIENT_ACKNOWLEDGE 的情况下,没有调用 acknowledge 或者调用了 recover 方法;

一个消息被 redelivedred 超过默认的最大重发次数(默认 6 次)时,消费端会给 broker 发送一个”poison ack”(ActiveMQMessageConsumer#dispatch:1460 行),表示这个消息有毒,告诉 broker 不要再发了。这个时候 broker 会把这个消息放到 DLQ(死信队列)。

ActiveMQ 中默认的死信队列是 ActiveMQ.DLQ,如果没有特别的配置,有毒的消息都会被发送到这个队列。默认情况下,如果持久消息过期以后,也会被送到 DLQ 中

死信队列配置策略 :
缺省所有队列的死信消息都被发送到同一个缺省死信队列,不便于管理,可以通过individualDeadLetterStrategy 或 sharedDeadLetterStrategy 策略来进行修改

自动丢弃过期消息 :

死信队列的再次消费:

当定位到消息不能消费的原因后,就可以在解决掉这个问题之后,再次消费死
信队列中的消息。因为死信队列仍然是一个队列

GCP 中的确认截止日期、消息保留时间、死信和重试策略

【中文标题】GCP 中的确认截止日期、消息保留时间、死信和重试策略【英文标题】:Ack Deadline, Message Retention Duration, Dead Lettering and Retry Policy in GCP 【发布时间】:2021-10-16 21:25:50 【问题描述】:

我有几个与 GCP 中的上述主题相关的问题。如果有人可以详细解释它们,那将是非常有帮助的。谢谢你。我浏览了一些文档,但找不到简明的答案。

我的理解:

    确认截止日期:例如,如果此功能设置为 10 秒,则它会在 10 秒内等待订阅者确认消息,否则在 10 秒后重新传递消息。

问题1: 在推送订阅者的情况下,pubsub 服务在等待 10 秒以使 ack 截止日期结束后再次将消息重新传递/推送给订阅者。 在pull消息的情况下,subscriber第一次尝试pull消息,一旦他pull,10secs ack截止时间时钟就会开始,所以在此期间如果subscriber再次尝试pull消息,他们会不会收到消息作为队列将关闭 10 秒?

    消息保留时间:默认设置为 7 天。所有已传递给订阅者但未得到订阅者确认的消息,经过某些重试尝试(例如 5 次)后,在 5 次重试后,它们会在主题中停留 7 天,并在 7 天后被删除。

问题 2: 但是订阅者是否会在每次拉取主题时收到这些消息,即使在最大重试次数之后?

    Dead lettering:Dead lettering 主题是您可以创建的主题,用于将主要主题中的坏/错误转发到死信主题。

问题 3: 这里的坏消息,是指无法通过 pubsub 服务传递给订阅者的消息还是订阅者无法确认的消息。但在第二种情况下,订阅者无法确认。这也可能意味着消息可能很好,但订阅者没有确认它们。在这种情况下,由于消息保留设置为 7 天,它们会留在同一个主题中,还是如果订阅创建了死信,是否由 pubsub 服务负责将消息转发到死信主题?

    Retry Policy: There are two options here 1. retry immediately: which when selected the pubsub servcie retries to deliver the message immediately to the subscriber if the subscriber doesn't ack the message before the ack deadline.第二个选项:使用指数退避重试:选择该选项后,发布订阅服务会尝试在将消息重新传递给订阅者之前给出延迟,并且它可以做的最大延迟是最大指数退避。 问题4: 让我们在这里举个例子: 假设我将确认截止日期设置为 10 秒。并将重试策略设置为最小指数退避为 30 秒,最大为 600 秒。因此,在这种情况下,如果订阅者第一次拉出消息但没有确认,则确认截止时间时钟开始并假设它结束,那么如果订阅者第二次拉出消息,pubsub 服务会再等待 30 秒(最小指数退避)在尝试重新传递消息之前?

谢谢。

【问题讨论】:

【参考方案1】:

我们建议使用我们支持的客户端库,它会自动延长确认期限。一般而言,请在下面找到答案:

所以在此期间如果订阅者再次尝试拉取消息,他们会不会收到消息,因为队列将关闭 10 秒

您不会在该时间段内再次收到相同的消息。您仍然可以从积压中提取其他消息。请注意,这是一项尽力而为的功能,有时您可能会在确认截止日期内再次收到该消息。

但订阅者是否会在每次拉取主题时收到这些消息,即使在最大重试次数之后也是如此

您在哪里配置最大重试次数? Cloud Pub/Sub 仅在死信队列中提供此最大传递尝试次数。如果您配置了死信队列,则消息将在重试一定次数后转发到死信主题并从父订阅中删除。否则,消息将保留在父订阅中,Cloud Pub/Sub 将尽可能尝试传递它。

这里的错误消息,是指无法通过 pubsub 服务传递给订阅者的消息还是订阅者无法确认的消息。但在第二种情况下,订阅者无法确认。这也可能意味着消息可能很好,但订阅者没有确认它们。在这种情况下,由于消息保留设置为 7 天,它们会留在同一个主题中,还是如果订阅创建了死信,是否由 pubsub 服务负责将消息转发到死信主题?

此上下文中的错误消息将是the messages which the subscribers are not able to ack。如果订阅者即使无法处理消息也不想死信,那么他们不应该使用死信队列。如果您同意在失败情况下从订阅中删除消息,死信队列是理想的选择。超过 7 天保留期的邮件不会系统地移至死信主题。

假设我将确认截止日期设置为 10 秒。并将重试策略设置为最小指数退避为 30 秒,最大为 600 秒。因此,在这种情况下,如果订阅者第一次拉取消息但没有确认,确认截止时间时钟开始并假设它结束,那么如果订阅者第二次拉取消息,pubsub 服务会再等待 30 秒(最小指数退避)在尝试重新传递消息之前?

没错。这个答案也可能有用:GCP PubSub retry backoff timing。

我希望这会有所帮助。

【讨论】:

这个答案非常有用。谢谢@Mahesh Gattani。但唯一有点令人困惑的是最小和最大指数退避如何为拉动订阅者工作。 是的。重试策略适用于所有订阅类型。

以上是关于(7)消息的确认和重发机制的主要内容,如果未能解决你的问题,请参考以下文章

RocketMQ之offset确认机制

activemq-重发去重

ActiveMQ 重发机制与确认机制 实践

ActiveMQ重试机制

ActiveMQ 重发机制(消息发送失败后的重新发送)

TCP可靠传输机制