RocketMQ的了解

Posted jianqiang111

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ的了解相关的知识,希望对你有一定的参考价值。

MQ背景&选型
消息队列作为⾼并发系统的核⼼组件之⼀,能够帮助业务系统解构提升开发效率和系统稳定性。主要具
有以下优势:
  • 削峰填⾕(主要解决瞬时写压⼒⼤于应⽤服务能⼒导致消息丢失、系统奔溃等问题)
  • 系统解耦(解决不同重要程度、不同能⼒级别系统之间依赖导致⼀死全死)
  • 提升性能(当存在⼀对多调⽤时,可以发⼀条消息给消息系统,让消息系统通知相关系统)
  • 蓄流压测(线上有些链路不好压测,可以通过堆积⼀定量消息再放开来压测)
⽬前主流的MQ主要是Rocketmq、kafka、Rabbitmq,Rocketmq相⽐于Rabbitmq、kafka具有主要优
势特性有:
  •  ⽀持事务型消息(消息发送和DB操作保持两⽅的最终⼀致性,rabbitmq和kafka不⽀持)
  •  ⽀持结合rocketmq的多个系统之间数据最终⼀致性(多⽅事务,⼆⽅事务是前提)
  •  ⽀持延迟消息(rabbitmq和kafka不⽀持)
  •  ⽀持指定次数和时间间隔的失败消息重发(kafka不⽀持,rabbitmq需要⼿动确认)
  •  ⽀持consumer端tag过滤,减少不必要的⽹络传输(rabbitmq和kafka不⽀持)
  •  ⽀持重复消费(rabbitmq不⽀持,kafka⽀持)

集群部署

1) Name Server
Name Server是⼀个⼏乎⽆状态节点,可集群部署,节点之间⽆任何信息同步。
2) Broker
Broker部署相对复杂,Broker分为Master与Slave,⼀个Master可以对应多个Slave,但是⼀个Slave只能对应⼀个Master,Master与Slave的对应关系通过指定相同的Broker id,不同的Broker Id来定义,BrokerId为0表示Master,⾮0表示Slave。每个Broker与Name Server集群中的所有节点建⽴⻓连接,定时(每隔30s)注册Topic信息到所有NameServer。Name Server定时(每隔10s)扫描所有存活broker的连接,如果Name Server超过2分钟没有收到⼼跳,则Name Server断开与Broker的连接。
3) Producer
Producer与Name Server集群中的其中⼀个节点(随机选择)建⽴⻓连接,定期从Name Server取Topic路由信息,并向提供Topic服务的Master建⽴⻓连接,且定时向Master发送⼼跳。Producer完全⽆状态,可集群部署。
Producer每隔30s(由ClientConfifig的pollNameServerInterval)从Name server获取所有topic队列的最新情况,这意味着如果Broker不可⽤,Producer最多30s能够感知,在此期间内发往Broker的所有消息都会失败。
Producer每隔30s(由ClientConfifig中heartbeatBrokerInterval决定)向所有关联的broker发送⼼跳,
Broker每隔10s中扫描所有存活的连接,如果Broker在2分钟内没有收到⼼跳数据,则关闭与Producer的连接。
4)Producer Group
生产者组,简单来说就是多个发送同一类消息的生产者称之为一个生产者组。在这里可以不用关心,只要知道有这么一个概念即可。
5) Consumer
Consumer与Name Server集群中的其中⼀个节点(随机选择)建⽴⻓连接,定期从Name Server取Topic路由信息,并向提供Topic服务的Master、Slave建⽴⻓连接,且定时向Master、Slave发送⼼跳。
Consumer既可以从Master订阅消息,也可以从Slave订阅消息,订阅规则由Broker配置决定。
Consumer每隔30s从Name server获取topic的最新队列情况,这意味着Broker不可⽤时,Consumer最多最需要30s才能感知。
Consumer每隔30s(由ClientConfifig中heartbeatBrokerInterval决定)向所有关联的broker发送⼼跳,Broker每隔10s扫描所有存活的连接,若某个连接2分钟内没有发送⼼跳数据,则关闭连接;并向该Consumer Group的所有Consumer发出通知,Group内的Consumer重新分配队列,然后继续消费。
当Consumer得到master宕机通知后,转向slave消费,slave不能保证master的消息100%都同步过来了,因此会有少量的消息丢失。但是⼀旦master恢复,未同步过去的消息会被最终消费掉。
消费者对列是消费者连接之后(或者之前有连接过)才创建的。我们将原⽣的消费者标识由 {IP}@{消费者group}扩展为 {IP}@{消费者group}{topic}{tag},(例如xxx.xxx.xxx.xxx@mqtest_producergroup_2m2sTest_tag-zyk)。任何⼀个元素不同,都认为是不同的消费端,每个消费端会拥有⼀份⾃⼰消费对列(默认是broker对列数量*broker数量)。
6)Consumer Group
消费者组,和生产者类似,消费同一类消息的多个 consumer 实例组成一个消费者组。
7)Tag
标签可以被认为是对 Topic 进一步细化。一般在相同业务模块中通过引入标签来标记不同用途的消息。
8)Message
Message 是消息的载体。一个 Message 必须指定 topic,相当于寄信的地址。Message 还有一个可选的 tag 设置,以便消费端可以基于 tag 进行过滤消息。也可以添加额外的键值对,例如你需要一个业务 key 来查找 broker 上的消息,方便在开发过程中诊断问题。
  
 
关键特性机器实现原理
顺序消费
消息有序指的是可以按照消息的发送顺序来消费
例如:⼀笔订单产⽣了3条消息,分别是订单创建、订单付款、订单完成。消费时,必须按照顺序消费才有意义,与此同时多笔订单之间⼜是可以并⾏消费的。
例如:⽣产者差⽣了2条消息:M1、M2,要保证这两条消息的顺序。我们可能会这么做。
但是这个模型存在的问题是:如果M1和M2分别发送到Server上,就不能保证M1先到达集群,也不能保证M1先被消费。换个角度看,如果M2与M1到达MQ集群,甚至M2被消费后,M1才到达消费端,这时候消息就乱序了,说明以上模型是不能保证消息的顺序消费的。
也有一种简单的方式就是将M1、M2发送到同一个MQ Server上。
根据先到达先被消费的原则,M1会先于M2被消费,这样就保证了消息的顺序。
但是这个模型也仅仅是在理论上可以保证消息的顺序,在实际场景中可能会遇到下面的问题:网络延迟问题。只要将消息从⼀台服务器发往另⼀台服务器,就会存在⽹络延迟问题,如上图所示,如果发送M1耗时⼤于发送M2耗时,那么仍然存在可能M2被先消费,仍然不能保证消息的顺序,即使M1和M2同时到达消费端,由于不清楚消费端1和消费端2的负载情况,仍然可能出现M2先于M1被消费的情况。那如何解决这个问题呢?
将M1和M2发往同⼀个消费者,且发送M1后,需要消费端响应成功后才能发送M2。但是这⾥⼜会存在另外的问题:如果M1被发送到消费端后,消费端1没有响应,那么是继续发送M2呢,还是重新发送M1?⼀般来说为了保证消息⼀定被消费,肯定会选择重发M1到另外⼀个消费端2,如下图,保证消息顺序的正确⽅式:
但是仍然会有问题:消费端1没有响应Server时,有两种情况,⼀种是M1确实没有到达(数据可能在⽹络传输中丢失),另⼀种是消费端已经消费M1并且已经发回响应消息,但是MQ Server没有收到。如果是第⼆种情况,会导致M1被重复消费。
最好的解决方法是:我们在消费端,业务层面来保证消息的顺序,这样能最大程度的提高吞吐量。我们最终的目标是高容错性和高吞吐量,如果能忽略吞吐量,那么为什么要使用消息队列呢。
 
消息重复
造成消息重复的根本原因是:网络不可达。  
只要通过⽹络交换数据,就⽆法避免这个问题。所以解决这个问题的办法是绕过这个问题。那么问题就变成了:如果消费端收到两条⼀样的消息,应该怎样处理?
  1. 消费端处理消息的业务逻辑要保持幂等性。
  2. 保证每条数据都有唯⼀编号,且保证消息处理成功与去重表的⽇志同时出现。
第1条解决⽅案,很明显应该在消费端实现,不属于消息系统要实现的功能。
第2条可以由消息系统实现,也可以由业务端实现。正常情况下出现重复消息的概率其实很⼩,如果由消息系统来实现的话,肯定会对消息系统的吞吐量和⾼可⽤有影响,所以最好还是由业务端⾃⼰处理消息重复的问题,这也是RocketMQ不解决消息重复问题的原因。RocketMQ不保证消息不重复,如果你的业务系统需要保证严格的不重复消息,需要你⾃⼰在业务端去重。
 
事务消息
RocketMQ除了⽀持普通消息,顺序消息,另外还⽀持事务消息。举个例子A向B转账520元,在单机环境下,执行事务的情况大概是这样子的:
  1. 锁定A账户
  2. 锁定B账户
  3. 检查A账户是否有520元
  4. A账户减去520元
  5. B账户加上520元
  6. 解锁A账户
  7. 解锁B账户
这样将整个流程都是放在一个事务里面就很安全,但是当用户增长到一定的程度,A和B各自的账户和余额信息不再同一台服务器上了,那么上面的流程就会发生了改变:
  1. 锁定A账户
  2. 通过网络锁定B账户
  3. 检查A账户是否有520元
  4. A账户减去520元
  5. 通过网络B账户增加520元
  6. 通过网络解锁B账户
  7. 解锁A账户
在集群的环境下,因为是需要通过网络去锁定账户,所以耗时会成倍的增长,这肯定会导致效率低的,我们可以使用
大事务 = 小事务 + 异步
将大事务拆分成多个小事务这样基本上能够将跨机事务的执行效率优化到单机一致。
将发信息放到A扣款的事务中去,如果发送失败,抛出异常,事务回滚。但是这样会有一个问题(很小概率)就是A账户扣款,但是消费者消费消息失败,导致A账户扣款成功,B账户并没有到账。
这种情况阿里提供给我们的解决方法是“人工解决”,因为如果要回滚整个整个流程的话,系统复杂度会大大提升,而且很容易出现BUG,工作量也会大大增加,我们在设计的时候就需要去衡量是否需要花费这么大的代价来解决这样一个出现概率非常小的问题。
 
消息存储
RocketMQ的消息存储是由comsume queue 和 cimmit log配合完成的。
consumeQueue是消息的逻辑队列,相当于字典的⽬录,⽤来指定消息在物理⽂件commit log上的位置。
CommitLog
要想知道RocketMQ如何存储消息,我们先看看CommitLog。在RocketMQ中,所有topic的消息都存储在⼀个称为CommitLog的⽂件中,该⽂件默认最⼤为1GB,超过1GB后会轮到下⼀个CommitLog⽂件。通过CommitLog,RocketMQ将所有消息存储在⼀起,以顺序IO的⽅式写⼊磁盘,充分利⽤了磁盘顺序写减少了IO争⽤提⾼数据存储的性能,消息在CommitLog中的存储格式如下:
ConsumeQueue
⼀个ConsumeQueue表示⼀个topic的⼀个queue,rocketmq在消息存储上与kafka有着⾮常⼤的不同,RocketMQ的ConsumeQueue中不存储具体的消息,具体的消息由CommitLog存储,ConsumeQueue中只存储路由到该queue中的消息在CommitLog中的offffset,消息的⼤⼩以及消息所属的tag的hash(tagCode),⼀共只占20个字节,整个数据包如下:
消息存储方式
RocketMQ的消息存储由CommitLog和ConsumeQueue两部分组成,其中CommitLog⽤于存储原始的消息,⽽ConsumeQueue⽤于存储投递到某⼀个queue中的消息的位置信息,消费者在读取消息时,先读取ConsumeQueue,再通过ConsumeQueue中的位置信息读取CommitLog,得到原始的消息。
消息订阅
RocketMQ消息订阅有两种模式:
  1. push模式,即MQServer主动向消费端推送。
  2. Pull模式,即消费端在需要时,主动到MQServer拉取。
但是再具体实现时,Push和Pull模式都是采⽤消费端主动拉取的⽅式。
⾸先看下消费端的负载均衡:
消费端会通过RebalanceService线程,10s做⼀次基于Topic下的所有队列负载:
  1. 遍历Consumer下所有的Topic,然后根据Topic订阅所有的消息
  2. 获取同⼀Topic和Consume Group下的所有Consumer
  3. 然后根据具体的分配策略来分配消费队列,分配的策略包含:平均分配、消费端配置等
如上图所示,如果有5个队列,2个Consumer,那么第⼀个Consumer消费3个队列,第⼆个Consumer消费2个队列。这⾥采⽤的就是平均分配策略。它类似于分⻚的过程,Topic下⾯所有的Queue就是记录,Consumer的个数就相当于总的⻚数,那么每⻚有多少条记录,就类似于Consumer会消费哪些队列。
通过这样的策略来达到⼤体上的平均消费,这样的设计也可以很⽅便地⽔平扩展来提⾼Consumer的消费能⼒。
消费端的Push模式是通过⻓轮询的模式来实现的,就如同下图:(Push模式示意图)

 

 

Consumer端每隔⼀段时间主动向broker发送拉消息请求,broker在收到Pull请求后,如果有消息就⽴即返回数据,Consumer端收到返回的消息后,再回调消费者设置的Listener⽅法。如果broker在收到Pull请求时,消息队列⾥没有数据,broker端会阻塞请求指导有数据传递或超时才返回。
RocketMq的其他特性
  1. 定时消息
  2. 消息的刷盘策略
  3. 主动同步策略:同步双写、异步复制
  4. 海量消息堆积能⼒
  5. ⾼效通信

RocketMq的最佳实践

Producer

  1 import com.alibaba.fastjson.JSON;
  2 import com.alibaba.fastjson.JSONArray;
  3 import org.apache.rocketmq.client.exception.MQClientException;
  4 import org.apache.rocketmq.client.producer.DefaultMQProducer;
  5 import org.apache.rocketmq.client.producer.SendCallback;
  6 import org.apache.rocketmq.client.producer.SendResult;
  7 import org.apache.rocketmq.common.message.Message;
  8 
  9 import java.io.ByteArrayInputStream;
 10 import java.io.ObjectInputStream;
 11 import java.io.Serializable;
 12 import java.util.ArrayList;
 13 import java.util.HashMap;
 14 import java.util.List;
 15 import java.util.Map;
 16 
 17 /**
 18 * @Author 18011618
 19 * @Date 10:41 2018/7/17
 20 * @Function 消息生产者
 21  *
 22  * Producer最佳最佳实践
 23  * 1.⼀个应⽤尽可能⽤⼀个Topic,消息⼦类型⽤tags来标识,tags可以由应⽤⾃由设置。只有发送消息设置了tags,消费⽅在订阅消息时,才可以利⽤tags在broker做消息过滤。(完成)
 24  * 2.每个消息在业务层⾯的唯⼀标识码,要设置到keys字段,⽅便将来定位消息丢失问题。(完成)
 25  * 3.消息发送成功或者失败,要打印消息⽇志,务必打印sendResult和key字段(完成)
 26  * 4.对于消息不可丢失应⽤,务必要有消息重发机制。例如:消息发送失败,存储到数据库,能有定时程序尝试重发或者⼈⼯触发重发。
 27  * 5.某些应⽤如果不关注消息是否发送成功,请直接使⽤sendOneWay⽅法发送消息。
 28 */
 29 public class Producer {
 30     public static final List<Map<String,Object>> memList=new ArrayList<>();
 31     public static void main(String[] args) throws MQClientException {
 32         DefaultMQProducer producer = new DefaultMQProducer("test-group");
 33         producer.setNamesrvAddr("192.168.83.128:9876");
 34         producer.setInstanceName("rmq-instance");
 35 
 36         producer.start();
 37         try {
 38             for (int i = 0; i < 100; i++) {
 39                 User user = new User();
 40                 user.setId((int)(Math.random()*100));
 41                 user.setLoginName("abc" + i);
 42                 user.setPwd(String.valueOf(i));
 43                 Message message = new Message("log-topic", "user-tag", JSON.toJSONString(user).getBytes());
 44                 //System.out.println("生产者发送消息:" + JSON.toJSONString(user));
 45                 //异步发送
 46                 producer.send(message, new SendCallback() {
 47                     @Override
 48                     public void onSuccess(SendResult sendResult) {
 49                         System.out.println("发送消息成功!result is : " + user.toString());
 50                     }
 51 
 52                     @Override
 53                     public void onException(Throwable throwable) {
 54                         throwable.printStackTrace();
 55                         //消息发送失败,存进数据库以便日后手动重新发送(模拟)
 56                         Map<String,Object> memMap=new HashMap();
 57                         memMap.put("topic","log-topic");
 58                         memMap.put("tag","user-tag");
 59                         memMap.put("body",JSON.toJSONString(user).getBytes());
 60                         memList.add(memMap);
 61                         System.out.println("发送消息失败!result is : " + JSON.toJSONString(new String(message.getBody())));
 62                     }
 63                 });
 64             }
 65         } catch (Exception e) {
 66             e.printStackTrace();
 67         }
 68         producer.shutdown();
 69     }
 70 
 71     /**
 72      * byte转对象
 73      * @param bytes
 74      * @return
 75      */
 76     private static Object ByteToObject(byte[] bytes) {
 77         Object obj = null;
 78         try {
 79             // bytearray to object
 80             ByteArrayInputStream bi = new ByteArrayInputStream(bytes);
 81             ObjectInputStream oi = new ObjectInputStream(bi);
 82 
 83 
 84             obj = oi.readObject();
 85             bi.close();
 86             oi.close();
 87         } catch (Exception e) {
 88             System.out.println("translation" + e.getMessage());
 89             e.printStackTrace();
 90         }
 91         return obj;
 92     }
 93     /**
 94      * 发送用户消息
 95      */
 96     static class User implements Serializable {
 97         private String loginName;
 98         private String pwd;
 99         private int id;
100 
101         public int getId() {
102             return id;
103         }
104 
105         public void setId(int id) {
106             this.id = id;
107         }
108 
109         public String getLoginName() {
110             return loginName;
111         }
112 
113         public void setLoginName(String loginName) {
114             this.loginName = loginName;
115         }
116 
117         public String getPwd() {
118             return pwd;
119         }
120 
121         public void setPwd(String pwd) {
122             this.pwd = pwd;
123         }
124 
125         @Override
126         public String toString() {
127             return "User{" +
128                     "loginName=\'" + loginName + \'\\\'\' +
129                     ", pwd=\'" + pwd + \'\\\'\' +
130                     ", id=" + id +
131                     \'}\';
132         }
133     }
134 }

Consumer

 1 import com.alibaba.fastjson.JSONObject;
 2 import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
 3 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
 4 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
 5 import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
 6 import org.apache.rocketmq.client.exception.MQClientException;
 7 import org.apache.rocketmq.common.message.MessageExt;
 8 
 9 import java.util.HashSet;
10 import java.util.List;
11 import java.util.Set;
12 
13 /**
14  * Consumer最佳实践
15  * 1.消费过程要做到幂等
16  * 2.尽量使⽤批量⽅式消费,可以很⼤程度上提⾼消费吞吐量。
17  * 3.优化每条消息的消费过程
18  */
19 public class Consumer {
20 
21     public static void main(String[] args) throws MQClientException {
22         Set<Integer> set=new HashSet<>();
23         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test-group");
24  
25         consumer.setNamesrvAddr("192.168.83.128:9876");
26         consumer.setInstanceName("rmq-instance");
27         //设置topic和Tag
28         consumer.subscribe("log-topic", "user-tag");
29         //从RocketMq拿到数据后执行的操作
30         consumer.registerMessageListener(new MessageListenerConcurrently() {
31             public ConsumeConcurrentlyStatus consumeMessage(
32                     List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
33                 System.out.println("msgs.size为:"+msgs.size());
34                 for (MessageExt msg : msgs) {
35                     JSONObject jsonObject = JSONObject.parseObject(new String(msg.getBody()));
36                     boolean addResult = set.add(jsonObject.getIntValue("id"));
37                     if(!addResult){
38                         System.out.println("重复数据,id为"+jsonObject.getIntValue("id"));
39                     }
40                     System.out.println("消费者消费数据:"+new String(msg.getBody()));
41                 }
42                 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
43             }
44         });
45         consumer.start();
46      }
47 }

 

以上是关于RocketMQ的了解的主要内容,如果未能解决你的问题,请参考以下文章

消息队列学习 -- RocketMQ概念了解

了解MQ

图文并茂!深入了解RocketMQ的过期删除机制

十分钟了解RocketMQ架构

RocketMQ - 如何用死信队列解决消费者异常

你懂RocketMQ 的架构原理吗?