深入讲解ActiveMQ5.X消息的持久性

Posted IT技术精选文摘

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了深入讲解ActiveMQ5.X消息的持久性相关的知识,希望对你有一定的参考价值。

我经常被问到一些基本的关于解释消息存储在ActiveMQ中是如何工作的问题。在这里我将做一个高层面的解释。注意,上下文环境是它是在JMS范围内。如果你使用的是ActiveMQ的非JMS客户端(e.g.,STOMP,AMQP,MQTT,等),那么它的行为在一些案例中会有所不同。

ActiveMQ

JMS的持久性保证对于被标记为“持久的”而不能丢失的消息而言是非常强大的. 让我们看下它在ActiveMQ中是如何被运用的.

主题

主题使用了一个广播机制. 它允许我们在JMS领域使用发布订阅语义模型. 但当我们将一条消息标记为“持久的”而它并没有订阅者时会如何? 对于任何一个正常的广播而言 (就如我去市中心大声宣扬ActiveMQ的优势), 如果它没有订阅者 (好比在凌晨3点时根本就没有任何人能听到我) 时会怎么样? 什么也不会发生. 如果消息没有任何的订阅者(无活跃的或可持久的订阅者),那么当消息被发布后(持久或非持久的),ActiveMQ 对此消息不会做任何的事情.

如果消息有可持久的订阅者(活跃或非活跃),那么ActiveMQ只是会存储这些消息. 对于一个非活跃的可持久订阅, ActiveMQ 会将标记为“持久的”消息做持久存储并等待订阅者重新加入订阅,到那时它将会尝试投递消息.

队列

ActiveMQ的队列, 使用“持久的”消息做为一个默认的协议. 基本上我们会阻塞生产者线程并等待实际获得消息的broker的确认:

生产者:

  • 生产者发送消息

  • 生产者阻塞并等待broker的ACK

    • 如果ACK成功,那么生产者会继续发送消息

    • 如果NACK或者超时或者失败,那么会重试

Broker:

  • 接收消息

  • 将消息存储到磁盘

  • 回送ACK

对于 “非持久的”的消息发送, 流程是不一样的. 我们会使用“发送并忘记” 的模式. 主生产者线程不会被阻塞,任何的ACK或其它的响应在ActiveMQ连接传输线程上都是异步的:

  • 生产者发送消息

  • 生产者在线程内继续发送消息而不被阻塞

  • 生产者最终在一个独立的线程而不是主生产者线程中获得ACK

深入讲解ActiveMQ5.X消息的持久性

事务性的发送?

我们可以通过一次将多条消息合并发送到broker来提高性能. 这样将对网络和broker存储的使用更加的高效. 当做事务性发送的时候,有一个你需要知道且非常重要的差别, 那就是事务会话的开启和关闭 (回滚/提交) 与broker的交互都是同步的, 但是, 在事务窗口内发送的每条消息却是异步的. 如果一切都顺利那就没有任何问题,因为broker对这些消息进行了批处理. 但是如果有事务错误时会发生什么? 或者broker在保存这些消息时发生没有可用磁盘空间时会怎样?

这时我们需要在发送时设置一个ExceptionListener来监控异常. 当broker无资源可用时,我们也应该设置一个在客户端发送的 “生产者窗口”来允许我们加强对生产者流程的控制. 关于这块更多请查阅ActiveMQ生产者流程控制(http://activemq.apache.org/producer-flow-control.html).

深入讲解ActiveMQ5.X消息的持久性

改变默认值

我们可以改变生产者的设置行为:

  • useAsyncSend - 经常异步等待ACK, 甚至在持久性的发送和提交中

  • alwaysSyncSend – 强制所有的发送 (非持久的或事务性的发送也包括在内) 必须等待broker的ACK

使用默认通常是我们所需要的.

存储

对于生产环境使用ActiveMQ, 我建议使用共享存储方式. 这里我们需要了解在理解ActiveMQ保证的时候存储层会发生什么.

ActiveMQ默认会实现 JMS可持久性的需求,最基本的要求是当应用crash了也要有能力将消息从存储中恢复出来. 对于这一点, 我们默认会在文件系统上做一次 “fsync”操作. 这个操作在每个系统上会发生什么取决于每个系统所使用的OS、网络、存储控制器、存储设备等。 这跟你犹如期望使用任何的数据库来存储消息是类似的.

当我们需要将消息写入到事务日志时,我们会要求操作系统通过fsync调用将日志刷到磁盘上. 基本上我们会强制OS使用缓存文件通道将页文件写回到存储介质上并允许存储介质在“存储” 数据到磁盘上时做它所需要做的事情(取决于实现):

有一些存储控制器有一些自身需要刷新的缓存, 磁盘驱动也有自身的缓存. 这里有些缓存是靠电池来支持的,需要定时写回. 要理解ActiveMQ对消息的持久存储, 你就需要理解存储层.

消费者

最后一个谜团是我们如何将消息分发或投递到消费者,且消费者是如何确认消息的。 ActiveMQ 的JMS 库为你做好了一切, 所以你不需要担心你是否会丢失消息.

消息被分发到消费者取决于消费者的“预取”缓冲设置。可通过使用消费者可用的缓存来加速对消息的处理并在处理完后将缓存还回. 在ActiveMQ中,这些预取的消息在控制台里用的是“在飞行中”来代表. 它取决于消费者对消息的处理和确认(这取决于消息的确认模式… 默认模式是自动确认,即当消费者收到消息就会发送ACK.. 对更重要的消息处理你可能希望使用“客户端”确认,也即客户端明确的指示什么时候确认消息, 例如, 在完成一些处理后).

如果消费者因某些原因对消息处理失败,那么任何非确认的消息将会被投递到另一个消费者(如果有),然后执行上面同样的处理方式。broker在未得到ACK前不会将消息从索引中移除。所以这里包含了消费者层和网络层上的失败. 如果在消费者“成功处理”(注意,这里的“成功处理”的因用例的不同含义有所不同)后这两层上有任何一层发生失败 , 且broker没有得到确认, 那么broker有可能需要重新发送消息. 在这种情况下,你可以实现一个幂等的消费者以在消费者端收到重复的消息来结束对消息的成功处理. 在扩展消息的生产者/消费者时,你将会希望有幂等的消费者.

最后需要注意的是: 在没有使用XA事务时,JMS不会保证一次且仅且一次的消息处理. JMS会保证一次且仅且一次的消息投递,在这个范围内它会将消息标记为“可被重复投递”并让消费者来检查,消费者会负责它允许被处理多少次(或使用幂等的消费者来做过滤).

以上是关于深入讲解ActiveMQ5.X消息的持久性的主要内容,如果未能解决你的问题,请参考以下文章

ActiveMQ 5.x 和 DurableConsumer

深入浅出 RabbitMQ

深入理解Kafka

深入理解Kafka

深入理解Kafka

Kafka原理讲解