RocketMQ使用之消息保证,重复读,积压,顺序,过滤,延时,事务,死信
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ使用之消息保证,重复读,积压,顺序,过滤,延时,事务,死信相关的知识,希望对你有一定的参考价值。
参考技术A消息可能在哪些阶段丢失呢?可能会在这三个阶段发生丢失: 生产阶段 、 存储阶段 、 消费阶段
所以要从这三个阶段考虑:
在生产阶段,主要通过请求确认机制,来保证消息的可靠传递。
存储阶段,可以通过配置可靠性优先的 Broker 参数来避免因为宕机丢消息,简单说就是可靠性优先的场景都应该使用同步
从 Consumer 角度分析,如何保证消息被成功消费?
Consumer 保证消息成功消费的关键在于确认的时机,不要在收到消息后就立即发送消费确认,而是应该在执行完所有消费业务逻辑之后,再发送消费确认。因为消息队列维护了消费的位置,逻辑执行失败了,没有确认,再去队列拉取消息,就还是之前的一条。
对分布式消息队列来说,同时做到确保一定投递和不重复投递是很难的,就是所谓的 有且仅有一次 。 RocketMQ 择了确保一定投递,保证消息不丢失,但有可能造成消息重复。
处理消息重复问题,主要有业务端自己保证,主要的方式有两种: 业务幂等 和 消息去重
发生了消息积压,这时候就得想办法赶紧把积压的消息消费完,就得考虑提高消费能力,一般有两种办法:
顺序消息是指消息的 消费顺序 和 产生顺序 相同,在有些业务逻辑下,必须保证顺序,比如订单的生成、付款、发货,这个消息必须按顺序处理才行。
顺序消息分为 全局顺序消息 和 部分顺序消息 :
部分顺序消息相对比较好实现,生产端需要做到把同 ID 的消息发送到同一个 Message Queue ;在消费过程中,要做到从同一个 Message Queue 读取的消息顺序处理——消费端不能并发处理顺序消息,这样才能达到部分有序。
发送端使用 MessageQueueSelector 类来控制 把消息发往哪个 Message Queue
消费端通过使用 MessageListenerOrderly 来解决单 Message Queue 的消息被并发处理的问题
RocketMQ 默认情况下不保证顺序,比如创建一个 Topic ,默认八个写队列,八个读队列,这时候一条消息可能被写入任意一个队列里;在数据的读取过程中,可能有多个 Consumer ,每个 Consumer 也可能启动多个线程并行处理,所以消息被哪个 Consumer 消费,被消费的顺序和写人的顺序是否一致是不确定的。
要保证全局顺序消息, 需要先把 Topic 的读写队列数设置为 一,然后 Producer Consumer 的并发设置,也要是一。简单来说,为了保证整个 Topic 全局消息有序,只能消除所有的并发处理,各部分都设置成单线程处理 ,这时候就完全牺牲 RocketMQ 的高并发、高吞吐的特性了。
有两种方案:
对消息的过滤有三种方式:
电商的订单超时自动取消,就是一个典型的利用延时消息的例子,用户提交了一个订单,就可以发送一个延时消息, 1h 后去检查这个订单的状态,如果还是未付款就取消订单释放库存。
RocketMQ 是支持延时消息的,只需要在生产消息的时候设置消息的延时级别:
但是目前 RocketMQ 支持的延时级别是有限的:
那么 RocketMQ 怎么实现延时消息的
简单,八个字: 临时存储 + 定时任务
Broker 收到延时消息了,会先发送到主题( SCHEDULE_TOPIC_XXXX )的相应时间段的 Message Queue 中,然后通过一个定时任务轮询这些队列,到期后,把消息投递到目标 Topic 的队列中,然后消费者就可以正常消费这些消息。
半消息 :是指暂时还不能被 Consumer 消费的消息, Producer 成功发送到 Broker 端的消息,但是此消息被标记为 暂不可投递 状态,只有等 Producer 端执行完本地事务后经过二次确认了之后, Consumer 才能消费此条消息。
依赖半消息,可以实现分布式消息事务,其中的关键在于二次确认以及消息回查:
RocketMQ 实现消息事务 :
死信队列 用于处理无法被正常消费的消息,即 死信消息
当一条消息初次消费失败,消息队列 RocketMQ 会自动进行消息重试;达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列 RocketMQ 不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中,该特殊队列称为 死信队列
死信消息的特点:
死信队列的特点:
RocketMQ 控制台提供对死信消息的查询、导出和重发的功能。
NameServer 因为是无状态,且不相互通信的,所以只要集群部署就可以保证高可用。
RocketMQ 的高可用主要是在体现在 Broker 的读和写的高可用, Broker 的高可用是通过集群和主从实现的。
也就是说 Producer 只能向 Master 角色的 Broker 写入消息, Cosumer 可以从 Master 和 Slave 角色的 Broker 读取消息。
Consumer 的配置文件中,并不需要设置是从 Master 读还是从 Slave 读,当 Master 不可用或者繁忙的时候, Consumer 的读请求会被自动切换到从 Slave 。有了自动切换 Consumer 这种机制,当一个 Master 角色的机器出现故障后, Consumer 仍然可以从 Slave 读取消息,不影响 Consumer 读取消息,这就实现了读的高可用。
如何达到发送端写的高可用性
注意 : RocketMQ 目前还不支持 Broker 把 Slave 自动转成 Master ,如果机器资源不足,需要把 Slave 转成 Master ,则要手动停止 Slave 色的 Broker ,更改配置文件,用新的配置文件启动 Broker
rocketMQ之顺序消费
消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。
顺序消费的原理解析,在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列);而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。
下面用订单进行分区有序的示例。一个订单的顺序流程是:创建、付款、推送、完成。订单号相同的消息会被先后发送到同一个队列中,消费时,同一个OrderId获取到的肯定是同一个队列。
1 顺序消息生产
package org.apache.rocketmq.example.order2;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
/**
* Producer,发送顺序消息
*/
public class Producer
public static void main(String[] args) throws Exception
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
String[] tags = new String[]"TagA", "TagC", "TagD";
// 订单列表
List<OrderStep> orderList = new Producer().buildOrders();
Date date = new Date();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String dateStr = sdf.format(date);
for (int i = 0; i < 10; i++)
// 加个时间前缀
String body = dateStr + " Hello RocketMQ " + orderList.get(i);
Message msg = new Message("TopicTest", tags[i % tags.length], "KEY" + i, body.getBytes());
SendResult sendResult = producer.send(msg, new MessageQueueSelector()
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg)
Long id = (Long) arg; //根据订单id选择发送queue
long index = id % mqs.size();
return mqs.get((int) index);
, orderList.get(i).getOrderId());//订单id
System.out.println(String.format("SendResult status:%s, queueId:%d, body:%s",
sendResult.getSendStatus(),
sendResult.getMessageQueue().getQueueId(),
body));
producer.shutdown();
/**
* 订单的步骤
*/
private static class OrderStep
private long orderId;
private String desc;
public long getOrderId()
return orderId;
public void setOrderId(long orderId)
this.orderId = orderId;
public String getDesc()
return desc;
public void setDesc(String desc)
this.desc = desc;
@Override
public String toString()
return "OrderStep" +
"orderId=" + orderId +
", desc='" + desc + '\\'' +
'';
/**
* 生成模拟订单数据
*/
private List<OrderStep> buildOrders()
List<OrderStep> orderList = new ArrayList<OrderStep>();
OrderStep orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("创建");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111065L);
orderDemo.setDesc("创建");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("付款");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103117235L);
orderDemo.setDesc("创建");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111065L);
orderDemo.setDesc("付款");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103117235L);
orderDemo.setDesc("付款");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111065L);
orderDemo.setDesc("完成");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("推送");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103117235L);
orderDemo.setDesc("完成");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("完成");
orderList.add(orderDemo);
return orderList;
2 顺序消费消息
package org.apache.rocketmq.example.order2;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;
/**
* 顺序消息消费,带事务方式(应用可控制Offset什么时候提交)
*/
public class ConsumerInOrder
public static void main(String[] args) throws Exception
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");
consumer.setNamesrvAddr("127.0.0.1:9876");
/**
* 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
* 如果非第一次启动,那么按照上次消费的位置继续消费
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTest", "TagA || TagC || TagD");
consumer.registerMessageListener(new MessageListenerOrderly()
Random random = new Random();
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context)
context.setAutoCommit(true);
for (MessageExt msg : msgs)
// 可以看到每个queue有唯一的consume线程来消费, 订单对每个queue(分区)有序
System.out.println("consumeThread=" + Thread.currentThread().getName() + "queueId=" + msg.getQueueId() + ", content:" + new String(msg.getBody()));
try
//模拟业务逻辑处理中...
TimeUnit.SECONDS.sleep(random.nextInt(10));
catch (Exception e)
e.printStackTrace();
return ConsumeOrderlyStatus.SUCCESS;
);
consumer.start();
System.out.println("Consumer Started.");
以上是关于RocketMQ使用之消息保证,重复读,积压,顺序,过滤,延时,事务,死信的主要内容,如果未能解决你的问题,请参考以下文章