决战圣地玛丽乔亚Day51---消息队列RocketMQ

Posted Dva清流

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了决战圣地玛丽乔亚Day51---消息队列RocketMQ相关的知识,希望对你有一定的参考价值。

主要组成:

https://www.cnblogs.com/dwj-ngu/p/17087863.html

为什么吞吐量高?

commitlog统一写入

cnblogs.com/dwj-ngu/p/17100284.html

0拷贝技术:

搞清楚拷贝的流程,以及0拷贝技术的改进。

https://www.cnblogs.com/dwj-ngu/p/17120488.html

https://www.cnblogs.com/dwj-ngu/p/17125257.html

 

如何做到信息不丢失,消息不被重复消费?
消息的三个处理阶段:

  • Producer发送消息阶段。

  • Broker处理消息阶段。

  • Consumer消费消息阶段。

我们对其进行控制即可:

Producer发送消息阶段
发送消息阶段涉及到Producer到broker的网络通信,因此丢失消息的几率一定会有,那RocketMQ在此阶段用了哪些手段保证消息不丢失了(或者说降低丢失的可能性)。

手段一:提供SYNC的发送消息方式,等待broker处理结果。

RocketMQ提供了3种发送消息方式,分别是:

同步发送:Producer 向 broker 发送消息,阻塞当前线程等待 broker 响应 发送结果。

异步发送:Producer 首先构建一个向 broker 发送消息的任务,把该任务提交给线程池,等执行完该任务时,回调用户自定义的回调函数,执行处理结果。

Oneway发送:Oneway 方式只负责发送请求,不等待应答,Producer只负责把请求发出去,而不处理响应结果。

我们在调用producer.send方法时,不指定回调方法,则默认采用同步发送消息的方式,这也是丢失几率最小的一种发送方式。

手段二:发送消息如果失败或者超时,则重新发送。

发送重试源码如下,本质其实就是一个for循环,当发送消息发生异常的时候重新循环发送。默认重试3次,重试次数可以通过producer指定。

手段三:broker提供多master模式,即使某台broker宕机了,保证消息可以投递到另外一台正常的broker上。

如果broker只有一个节点,则broker宕机了,即使producer有重试机制,也没用,因此利用多主模式,当某台broker宕机了,换一台broker进行投递。

总结

producer消息发送方式虽然有3种,但为了减小丢失消息的可能性尽量采用同步的发送方式,同步等待发送结果,利用同步发送+重试机制+多个master节点,尽可能减小消息丢失的可能性。

Broker处理消息阶段

手段四:提供同步刷盘的策略

public enum FlushDiskType
SYNC_FLUSH, //同步刷盘
ASYNC_FLUSH//异步刷盘(默认)

我们知道,当消息投递到broker之后,会先存到page cache,然后根据broker设置的刷盘策略是否立即刷盘,也就是如果刷盘策略为异步,broker并不会等待消息落盘就会返回producer成功,也就是说当broker所在的服务器突然宕机,则会丢失部分页的消息。

手段五:提供主从模式,同时主从支持同步双写

即使broker设置了同步刷盘,如果主broker磁盘损坏,也是会导致消息丢失。

因此可以给broker指定slave,同时设置master为SYNC_MASTER,然后将slave设置为同步刷盘策略。

此模式下,producer每发送一条消息,都会等消息投递到master和slave都落盘成功了,broker才会当作消息投递成功,保证休息不丢失。

总结:在broker端,消息丢失的可能性主要在于刷盘策略和同步机制。RocketMQ默认broker的刷盘策略为异步刷盘,如果有主从,同步策略也默认的是异步同步,这样子可以提高broker处理消息的效率,但是会有丢失的可能性。因此可以通过同步刷盘策略+同步slave策略+主从的方式解决丢失消息的可能。

Consumer消费消息阶段

手段六:consumer默认提供的是At least Once机制

从producer投递消息到broker,即使前面这些过程保证了消息正常持久化,但如果consumer消费消息没有消费到也不能理解为消息绝对的可靠。因此RockerMQ默认提供了At least Once机制保证消息可靠消费。

何为At least Once?

Consumer先pull 消息到本地,消费完成后,才向服务器返回ack。

通常消费消息的ack机制一般分为两种思路:

先提交后消费;

先消费,消费成功后再提交;

思路一可以解决重复消费的问题但是会丢失消息,因此Rocketmq默认实现的是思路二,由各自consumer业务方保证幂等来解决重复消费问题。

手段七:消费消息重试机制

当消费消息失败了,如果不提供重试消息的能力,则也不能算完全的可靠消费,因此RocketMQ本身提供了重新消费消息的能力。

总结:

consumer端要保证消费消息的可靠性,主要通过At least Once+消费重试机制保证。

 

 

如何保证消息不被重复消费?

回答这个问题,首先你别听到重复消息这个事儿,就一无所知吧,你先大概说一说可能会有哪些重复消费的问题。

首先,比如 RabbitMQ、RocketMQ、Kafka,都有可能会出现消息重复消费的问题,正常。因为这问题通常不是 MQ 自己保证的,是由我们开发来保证的。挑一个 Kafka 来举个例子,说说怎么重复消费吧。

Kafka 实际上有个 offset 的概念,就是每个消息写进去,都有一个 offset,代表消息的序号,然后 consumer 消费了数据之后,每隔一段时间(定时定期),会把自己消费过的消息的 offset 提交一下,表示“我已经消费过了,下次我要是重启啥的,你就让我继续从上次消费到的 offset 来继续消费吧”。

但是凡事总有意外,比如我们之前生产经常遇到的,就是你有时候重启系统,看你怎么重启了,如果碰到点着急的,直接 kill 进程了,再重启。这会导致 consumer 有些消息处理了,但是没来得及提交 offset,尴尬了。重启之后,少数消息会再次消费一次。

有这么个场景。数据 1/2/3 依次进入 kafka,kafka 会给这三条数据每条分配一个 offset,代表这条数据的序号,我们就假设分配的 offset 依次是 152/153/154。消费者从 kafka 去消费的时候,也是按照这个顺序去消费。假如当消费者消费了 offset=153 的这条数据,刚准备去提交 offset 到 zookeeper,此时消费者进程被重启了。

那么此时消费过的数据 1/2 的 offset 并没有提交,kafka 也就不知道你已经消费了 offset=153 这条数据。那么重启之后,消费者会找 kafka 说,嘿,哥儿们,你给我接着把上次我消费到的那个地方后面的数据继续给我传递过来。由于之前的 offset 没有提交成功,那么数据 1/2 会再次传过来,如果此时消费者没有去重的话,那么就会导致重复消费。

 

如果消费者干的事儿是拿一条数据就往数据库里写一条,会导致说,你可能就把数据 1/2 在数据库里插入了 2 次,那么数据就错啦。

其实重复消费不可怕,可怕的是你没考虑到重复消费之后,怎么保证幂等性。

举个例子吧。假设你有个系统,消费一条消息就往数据库里插入一条数据,要是你一个消息重复两次,你不就插入了两条,这数据不就错了?但是你要是消费到第二次的时候,自己判断一下是否已经消费过了,若是就直接扔了,这样不就保留了一条数据,从而保证了数据的正确性。

一条数据重复出现两次,数据库里就只有一条数据,这就保证了系统的幂等性。

幂等性,通俗点说,就一个数据,或者一个请求,给你重复来多次,你得确保对应的数据是不会改变的,不能出错。

所以第二个问题来了,怎么保证消息队列消费的幂等性?

其实还是得结合业务来思考,我这里给几个思路:

比如你拿个数据要写库,你先根据主键查一下,如果这数据都有了,你就别插入了,update 一下好吧。

比如你是写 Redis,那没问题了,反正每次都是 set,天然幂等性。

比如你不是上面两个场景,那做的稍微复杂一点,你需要让生产者发送每条数据的时候,里面加一个全局唯一的 id,类似订单 id 之类的东西,然后你这里消费到了之后,先根据这个 id 去比如 Redis 里查一下,之前消费过吗?如果没有消费过,你就处理,然后这个 id 写 Redis。如果消费过了,那你就别处理了,保证别重复处理相同的消息即可。

比如基于数据库的唯一键来保证重复数据不会重复插入多条。因为有唯一键约束了,重复数据插入只会报错,不会导致数据库中出现脏数据。

 

 

主从结构

 

 

 

扩容:

高可用:

 

决战圣地玛丽乔亚Day38---JVM相关

JVM的内存结构:

1.程序计数器:线程私有,保存执行指令地址。

2.java虚拟机栈(线程创建,并存方法调用的相关参数):

每个线程在创建时候都会被分配一个虚拟机栈。当线程调用方法时,会创建一个栈帧,入栈,方法执行完毕栈帧出栈。

栈帧会在调用方法的时候把存局部变量表,操作数栈,动态连接,方法出口等信息存进去,然后压入java虚拟机栈。

3.本地方法栈

其他变成语言的接口。

4.java堆

所有线程共享的一块区域,优化GC主要优化的部分。

用来存一些实例对象(数组/普通对象)。

我们new Entity()的时候会分配一块空间存这个Entity。

 

5.方法区

用于存储已被虚拟机加载的类信息、常量、静态变量、即时编译器编译后的代码等数据

1)类信息:java程序运行,java虚拟机会把类的信息存在方法区。包括类的名称、父类的名称、类的修饰符、类的字段、方法等信息

2)  运行时常量池:方法区的一部分,存一些字面量和符号引用。其中包括字符串常量、类和接口的全限定名、字段和方法的名称和描述符等信息。

3)静态变量:由于静态变量是类级别的变量,因为这个是类的属性不属于对象的属性,所以存方法区。

4)即时编译后的代码

7.直接内存

与Java堆不同,直接内存并不是虚拟机运行时数据区的一部分,也不是Java虚拟机规范中定义的内存区域。直接内存的分配不受Java堆大小的限制,但是会受到本机总内存大小以及处理器寻址空间的限制

 

常量池的作用?为什么是-128~127?

1.常量池把所有的常量存储在一起,避免了重复分配的问题,如果用到就要重新分配,太浪费内存空间。

2.使用频繁,用常量池避免查找和创建的消耗。

3.动态解析:符号引用是通过类名、方法名、字段名等符号来引用类、方法、字段等元素的。在程序运行时,Java虚拟机需要将这些符号引用解析成实际的内存地址,才能进行方法调用、字段访问等操作

这个过程称为符号引用的动态解析。常量池存符号引用进行动态解析,符合java语言的动态语言特性。

-128~127:

这个范围的数字使用频率高,java虚拟机把这个范围作为常量池缓存而不是每次都重新创建。这个范围是整数常数,像Boolean、String、Double等没有常量池缓存限制。

 

永久代1,.8为什么会被移出到堆外?

之前永久代在java堆内的特殊区域,存元数据,静态变量,常量池等等。但是由于永久代大小有限,经常出现内存溢出。

所以1.8之后把元数据、静态变量、常量池等存储到本地内存(Native Memory),称之为元空间。

元空间相比之前的永久代的好处是:

元空间的大小是动态的,根据策略进行动态调整防止OOM。

避免了java堆的内存碎片化问题,提高内存使用率。

减少了FULL GC,提高程序响应速度和性能。

 

栈和堆:

栈主要存方法调用的东西。

堆主要存对象、数组等动态分配的东西,和类息息相关。对象创建时,java虚拟机在堆分配一个连续的空间并返回引用来存对象,对象不再被引用的时候,会通过垃圾回收算法对其进行回收操作。

栈溢出的情况:

1.递归调用过深。递归自身或其他方法,栈帧被大量创建打满java栈的大小。

2.方法调用层数过多

3.局部变量过多,方法的局部变量过多打满java虚拟机栈大小

 

如何进行垃圾回收:

1).标记阶段:Java虚拟机从根对象(如线程、静态变量等)出发,递归遍历所有可达对象,将它们标记为“存活”。

2).清除阶段:Java虚拟机清除所有未被标记的对象,释放它们所占用的内存空间。

3).整理阶段:Java虚拟机将所有存活对象向一端移动,从而使剩余的内存空间变成一块连续的空间,便于后续的对象分配。

首先,标记阶段有一个问题,如何判断对象是否可达?

引用计数器法:引用计数法是一种简单的判断对象是否可达的方法。它的原理是在每个对象中记录一个引用计数器,每当有一个对象引用它时,引用计数器加1,每当一个对象取消对它的引用时,

引用计数器减1。当某个对象的引用计数器为0时,就可以判断它已经不再被引用,可以被回收了。但是,引用计数法无法解决循环引用的问题,即两个或多个对象相互引用,导致它们的引用计数器永远不为0,无法被回收。

可达性分析:可达性分析法是一种更加可靠的判断对象是否可达的方法。它的原理是从一组根对象出发,遍历所有对象,将所有可达的对象标记为“存活”,所有不可达的对象标记为“死亡”,

从而回收它们所占用的内存空间。在可达性分析法中,通过一系列算法和数据结构,可以高效地判断对象是否可达,避免循环引用的问题。

1,从一组根对象(如线程、静态变量等)出发,将它们作为起始点,遍历所有与之相连的对象,将它们标记为“存活”。(根搜索)

2.将所有可达的对象标记为“存活”,不可达的对象标记为“死亡”。(对象标记)

3.清除所有被标记为“死亡”的对象,回收它们所占用的内存空间。(清除死亡对象)

4.将所有存活的对象向一端移动,从而使剩余的内存空间变成一块连续的空间,便于后续的对象分配(压缩存活对象)

具体的实现方法:

标记-清除算法:

最简单的方法。

1.标记所有可达对象存活,不可达对象死亡。

2.清除死亡对象回收空间

优缺点:内存碎片,回收效率低

标记-复制算法:

1.标记所有可达对象存活,不可达对象死亡。

2.在复制阶段,将所有存活的对象复制到另一个内存区域中,保证存活对象之间的空间是连续的。在清除阶段,将原来的内存空间清空,回收它们所占用的内存空间

加大内存负担,用更多空间保证内存碎片问题。

 

三色标记法

白色:没有被标记过,一开始都是白色。(没有引用)
灰色:已经被标记过,但是该对象下的属性没有全被标记完(由引用,但是没有处理完引用关系)
黑色:已经标记过,并且该对象下的属性全部标记完毕。(有引用,不用清除)

1.从一组根对象出发,将它们标记为灰色,加入待处理队列

2.从待处理队列中取出一个灰色对象,将它标记为黑色,遍历所有与之相连的对象,将它们标记为灰色,加入待处理队列。

重复步骤2,直到待处理队列为空

3.清除所有被标记为白色的对象,回收它们所占用的内存空间。

结束后,白色里仍然没有标记的,是不可达,进行回收。

并发标记带来的问题:

1.并发修改导致标记结果不准确。 加锁/CAS等性能损失换原子性

2.漏标情况,应用程序动态产生新对象,漏标,内存泄漏。可以使用增量标记优化。

3.并发标记过程中,应用程序可能会持续地分配内存空间,导致垃圾回收器无法回收内存空间,从而导致内存空间的不足。为了解决这个问题,需要使用内存分配器的优化策略,如分代回收TLAB等,以提高内存分配的效率和减少内存空间的浪费

 


如果并发标记,标记期间对象间的引用可能发生变化,就会出现多标漏标情况。
多标:
黑色,灰色变成白色,但是之前标记成了黑色,变成了不会被发现的浮动垃圾,交给下次gc处理即可,影响不大。
漏标:
并发标记的过程中,白色断开引用成为垃圾,黑色引用白色对象。但是黑色已经被完全扫描过不会重新扫描,导致对象需要被引用但是又面临被回收。CMS和G1都对此进行了优化

CMS:

对于CMS垃圾回收器而言,它采用增量标记的方式来解决漏标问题。增量标记是指在垃圾回收过程中,将标记过程分解成多个阶段,每个阶段只标记一部分对象,然后让应用程序继续执行一段时间,再继续标记下一部分对象。这样可以避免垃圾回收线程一次性占用太多CPU资源,同时也减少了漏标的可能性。


G1:

即使用 remembered set 来解决漏标问题。remembered set 是一种数据结构,用于记录从年轻代到老年代的引用关系,当年轻代中的对象被回收时,G1垃圾回收器会扫描 remembered set 中的引用关系,从而标记所有与之相关的对象。这样可以避免漏标问题,并且也减少了垃圾回收的开销。

 

以上是关于决战圣地玛丽乔亚Day51---消息队列RocketMQ的主要内容,如果未能解决你的问题,请参考以下文章

决战圣地玛丽乔亚Day41 Spring其余部分

决战圣地玛丽乔亚Day37----JDK

决战圣地玛丽乔亚Day38---JVM相关

决战圣地玛丽乔亚Day39 -----GC内存模型类加载

2020-12-25:MQ中,如何保证消息的顺序性?

day41——多进程的消息队列消息队列pipe