如何设计一个可靠的分布式消息队列?

Posted IoT物联网开发社区

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何设计一个可靠的分布式消息队列?相关的知识,希望对你有一定的参考价值。

要回答这个问题,先得弄清楚下面这些问题:

     1 .什么是分布式系统,一个分布式系统要解决哪些问题?

     2.一个分布式的消息队列需要共享哪些元数据,需要同步哪些状态?

     3.怎么定义可靠?为了达到可靠,需要解决哪些问题?


一.什么是分布式系统,一个分布式系统要解决哪些问题?

分布式系统是由一系列分散自治组件通过互联网并行并发协作,从而组成的一个coherent软件系统。它具备资源共享,横向扩展,并行并发,可靠容错,透明开放等特性个人觉得分布式系统难点在于:如何在网络不可靠的情况下,依然能够让依靠网络通信的节点之间达成共识。

下面介绍一些分布式系统相关的基础理论,以便于了解分布式系统要解决那些问题 


1.事务的定义

       事务分本地事务和跨机事务,这里只讨论跨机事务(又叫分布式事务)。一个分布式事务包含一个或者一组状态,需要同时在一个或者多个分开的节点上更新数据。


事务的四个特性:原子性,一致性,.隔离性,永久性.

事务的原子性:状态要么在所有节点都更新成功,失败或者回滚。事务的一致性:仅仅更新成功还不行,还必须保证更新完成之后不能破坏原有的一致性状态。比如说转账,收款人收到的钱必须与扣款人被扣的钱数一样。事务的隔离性:一个事务的执行不能影响另一个事务的执行事务的永久性:事务的执行必须是持久的。事务执行完成之后,在t1时间发现它成功了。结果在没有其他事务的作用下,在t2时间去查,发现事务又被回滚了,这是不被允许的。


2.2PC

解决事务通常有2pc或者3pc两种办法。不论是两阶段提交还是三阶段提交,都必须引入一个协调者(coordinator)。原因是必须得有一个节点,能够知道各个执行节点对于该事务的执行结果,成功或者失败,而这个节点就是协调者。协调者负责裁决本次事务执行的最终结果是成功还是失败,并且负责通知给每一个执行节点,这个才是解决事务的最核心的思想,至于2pc或者3pc只是解决问题的方法不一样罢了。

两阶段就是将事务的提交过程分成两个阶段来执行。阶段一:提交事务请求,各个执行节点将事务写入redo日志,但是并不真正执行。阶段二:执行事务请求,根据redo日志做最终的commit或者undo回滚执行节点服从协调节点的裁决结果,本文章不讨论3阶段提交。


3.CAP理论

CAP理论(提出者Eric Brewer)是指在网络发生分区的情况下,系统设计者必须考虑要么满足可用性,要么满足一致性,不能同时满足两个。网络上很多对cap理论的解释和说明都是错误的或者令人迷惑的,(文末有推荐的一篇讲解cap的文章)


对于CAP理论需要做下面几点解释

1)一致性:是指顺序一致性,每一个节点必须能够看见一致的最近写入的状态,每一个节点必须能够看到相同的数据状态。


2)可用性:每一个没有死掉的节点,必须能够在合理的时间内处理所有的读写请求。之所以强调在合理的时间内响应,是因为在网络通信的情况下,必然涉及到通信是否成功的问题,而网络慢或者应用处理失败都有可能导致通信失败。调用方就必然要设置一个超时时间,以此来区分成功与否的阀值,不然双方很有可能陷入无限期的等待。


3)分区容忍性:这里的分区指的是网络分区,不是数据分区(分片)。网络分区是指原本是一个系统里面的节点,由于没法正常通信(网络不能分发消息)导致,系统内的节点从网络上形成了分裂的现象。网络分区是不可避免的,是一个分布式系统的设计者必须要去考虑的。为了保证系统正常运行,关键是分区之后该怎么取舍。


网络上有说实现AP的的分布式系统,我觉得这是错误的说法。实现AP的系统实际上就是不考虑网络分区,可是真有100%可靠可信的网络吗?不考虑分区不就是单机系统吗?单机系统还谈什么分布式系统,也就不在cap理论的讨论范畴以内了。


我觉得cap理论最大的贡献在于提醒系统设计者,必须考虑到网络分区之后,该怎么取舍才能保证系统的可用性和一致性的最大化。假如有两个节点组成一个分布式系统,想要保证两个节点之间的一致性,就必须同时写两个节点,两个节点都写成功了才算成功。这就要求两个节点之间必须能够正常通信,但是网络是不可靠的,分区是一种从概率上来讲必须发生的和要考虑的事情。假如网络发生分区了,两个节点之间不能正常通信,怎么办?这个时候如果要确保一致性,那么只能两个节点都不可用,因为没法保证两个节点之间状态的同步。如果要保证可用性,那就让两个节点独自接收请求,这样就没法保证一致性。


4.BASE理论

base理论应该是一个可落地的理论了,是由ebay架构师提出来的(具体是谁我就不知道了)。

  1. basic available

  2. sofe-state

  3. eventual consistency

简单概括是基本可用和最终一致性,这是一种我认为最可落地的解决方案了。对可用性和一致性做了最大化的权衡,既不追求完全的可用性,也不追求强一致性,两者都不走极端,也都不放弃。因为对于很多分布式系统,放弃可用性,是不可接受的。当然对于某些系统,追求强一致性也是可以的,但是追求最终一致性会有更好的性能和扩展性。


5.paxos算法

最早由1990 年由 Leslie Lamport 在论文《The Part-time Parliament》中提出的 Paxos 共识算法。这个论文是出了名的难懂,所以作者在2001年又写了Paxos made simple论文做了更清晰明了的解释。(见文末连接)

还有一个算法叫Raft算法,比paxos 好懂,但是我读完之后感觉差不多。由于下面要着重讨论zookeeper的ZAB协议,而ZAB协议又是基于Paxos协议做了扩展,所以这里就不详细讲解paxos算法了。


6.Zookeeper之ZAB协议

很多分布式系统自身不具备分布式的能力,通常会选择把状态数据通过zookeeper来在集群内协调一致,来间接实现分布式的能力,比如dubbo,haddop等。所以说zookeeper本身应该是一个非常成功的可被用于生产环境的分布式协调者,个人认为它是分布式理论的集大成者也不为过。要想完全理解ZAB协议必须先具备上面了的事务,2pc,cap,base,paxos等理论的基础知识。


官网对zookeeper的定义是:zookeeper是一个维护配置信息,维护名称信息,用作同步锁或者集群管理的中心化服务。这个定义趋向于说明作用,但是其本质是一个协调服务,上面说的哪几项只是具体场景下的应用。官网 https://cwiki.apache.org/confluence/display/ZOOKEEPER/Index


zookeeper起源于雅虎,是Hadoop项目下的子项目,是雅虎工程师根据Google的Chubby论文来实现的。之所以会想起启动这个一个项目,是因为雅虎工程师发现,分布式系统都需要处理集群状态协调一致,容错和可用性的问题。有的服务已经单独实现了,有的服务还未实现。那为什么不把这些能力单独抽象出来,作为独立的一个服务,对外暴露简单的api接口,让其他的分布式系统只专注于自己的业务呢?于是zookeeper项目启动了,zookeeper这个名字起的也挺好,动物园管理员。如果把各个微服务比喻成一个动物(事实上很多开源项目都是以动物命名的),zookeeper可不就是一个动物园管理员嘛。



这里就个人的理解做几点说明:

  1. zookeeper的节点分为leader,follower,observer。所有的写请求都会被路由到leader节点,由leader节点提出提案,和最终提交提案。follower节点具备投票和竞争leader的权利,但是一旦竞争失败服从leader的提案。observer节点是不具备选举和投票权的(群众,属于被代表的哪种),但是它可以用来提高读的吞吐量,因为leader,follower,observer都可以独自的处理读请求。

  2. zookeeper是基于两阶段提交的,即leader先提出议案,广播给所有的节点,当有过半follower响应成功的时候,就通知调用方写入成功,然后再由leader广播给其他的节点。

  3. zookeeper的是顺序一致性并非强一致性,而是最终一致性,因为paxos算法就没有保证强一致性,所以如果写入成功之后,leader挨个通知follower和observer改变状态,那么必然有一个中间状态,各个节点数据是不一致的。所以zookeeper是符合base理论的,即最终一致性,既然最终一致那么必然是基本可用了。

  4. zookeer的数据结构类似于unix的文件系统,是一个树形结构,但是不同的是它的每一个节点(znode)都可以用来存放数据,哪怕这个点有子节点。这就好比说unix的目录也能和文件一样存放数据。所以树形节点的层次,同一层级的节点个数,节点数据的大小等都会影响读写的性能,

  5. zookeeper适用于读多写少的场景,因为zookeeper写入的时候需要过半返回ack,才算写入成功,而且需要落盘,所以性能相对于读来说差很多。而且随着zookeeper的follower节点数据越多,写的性能越差。

  6. zookeeper不适用于存储大量的数据,特别是当网络不太稳定的时候,产生的后果更严重,因为对于分区的节点,如果又恢复了网络通信,这个时候leader会负责同步给这个分区的节点在失去联系期间所有的事务,如果需要同步的事务量比较大,这个也是不小的开销。如果有这种需求,通常只用zookeeper做事件的同步,即数据还是存储各个服务内部,但是利用zookeeper的通知机制来实现各个服务内部状态的一致性,这种用法就是集群管理。

  7. zookeeper有一个事务id(zxid)的概念,每次写请求都会加1,保证唯一。也就是说zookeeper对于每一个写请求都是一个事务,它能够满足原子性,也能满足最终一致性,同时能够满足隔离性,以及持久性。zookeeper对于写请求是能够保证顺序一致的。即两个请求A和B,如果A在B前,那么各个依赖于zookeeper的其他服务节点看到的顺序一定是A在前,B在后。zookeeper的每个znode的变更一定是一个原子操作,即如果想改变znode里面的数据,对于zookeeper来说一定是先删除旧的,再添加新的。所有的节点只能接收比自己本地zxid大的提案,且不能恢复到之前的提案。

  8. zookeeper又一个epoch的概念,是指每次选举之后,这个epoch加1,之所以需要这个,是因为怕由于网络原因,导致leader与follower之间的心跳失败了,让follower以为leader挂掉了,然后follower开始重新选举,选举了新的leader,可是这个时候如果旧的leader又活过来了,怎么办?因为新选举的leader和follower本地的epoch比旧的leader大,不好意思,旧的leader必须得降级为follower了

  9. 为什么zookeeper的quorum数量必须要求过半,即假如集群内的数量是2N+1,那么quorum必须得大于N+1。我觉得这有几点考虑。首先是可用性和一致性的平衡。如果quorum是集群内所有的节点,那么一致性是保证了,但是可用性就非常差了,因为只要集群内有一个节点与其他节点失去联系,就意味着必须返回调用方失败,这个是非常不划算的,因为假如集群内有5个节点,挂一个整个集群就不可用,那可用性也太差了。那为什么是N+1呢?为什么不是N呢?我觉得是为了保证最大程度的一致性,同时也能防止脑裂。假设集群内有个6个节点,由于网络分区,分成了两块,每一块各自三个节点。如果quorum是N,那么这两块就都是可用的,会各自选举出一个leader。对于依赖zookeeper的服务来说就会出现两个leader,一致性就失去了。quorum是N+1能够解决这个问题,因为不论怎么分区,都只会形成一个可用分区,最终都只会只有一个leader。

  10. 为什么zookeeper集群几点通常是奇数。

          T=5,quorum=3>(5/2+1),那么能够容忍2个失败

          T=6,quorum=4(6/2+1), 那么能够容忍2个失败

        既然5和6集群的可用性是一样的,那么为什么还要浪费一个节点呢? 3和4能够容忍失败的节点个数都是1。


二.一个分布式的消息队列需要共享哪些元数据?如何共享这些数据呢?


作为一个pub/sub模式的消息队列,如果是分布式的话,必然有一些数据需要在各个broker之间共享的。


  1. topic信息,因为每个broker都必须知道全量的topic ,以避免在每个broker内部维护了相同的topic

  2. topic下的队列分区信息,因为分布式的消息队列通常是支持横向扩展的。对于集群内加入了一个节点,只意味着,又可以加入加个队列了。所以必须得知道,这个topic的存储队列是分布在集群内部哪几个broker的。比如说集群有5个节点,2个topic,很有可能topicA的存储队列(分区)是在其中3个节点,topicB在另外2个几点。

  3. 消息消费者的信息。因为为了减少锁的竞争,以及保证消息的有序性,最好是一个队列只会由一个消息者来消费。这就涉及到到底哪个消费者消费哪个队列的问题了。如果不把消费者的信息集中起来,怎么知道当前有多少个消费者呢?

  4. master-slave或者说leader-follower的管理。分布式系统通常为了实现高可用,会对分区进行备份。为了保证master-slave数据的一致性,通常由master接受写请求,写完以后异步同步给slave.slave通常在master挂了之后节点读请求保证消息不丢。rocketmq就是这么实现的。而leader和follower的模式区别在于,follower其实是能够接受读请求的。follower不仅仅是一个备份的作用,还能起到读负载均衡的作用,kafka就是这么干的。也就是说对一个分区的master-slave的信息是需要共享的。这样才能对写请求和读请求做分发。


如何共享这些数呢?其实最简单的方式就是通过zookeeper,通过zookeeper来做集群管理和元数据存储。用来解决producer,broker,consumer之间的元数据共享,负载均衡,失败容错的问题,上面那些数据放入zookeeper里面存储之后,能起到什么作用呢?

1.producer需要从zookeeper获取topic下的分区(队列)信息,做负载均衡,比如说一个topicA的分区分布在brokera和brokerb上,每个broker4个分区,producer会从这个8个分区里面随机选择一个发送消息。有了新的分区之后,可以选择推送给provider也可以由provider定时来拉取。

2.consumer需要从zookeeper上获取topic下的分区信息,这里面负载均衡可以由broker来做也可以由consumer自己来做,consumer来做的话,需要consumer知道所有的consumer列表和topic下的分区列表,然后分别对这两个列表做排序,每个consumer采用相同的算法选取自己要消费的分区,rocketmq就是这么做的。如果是broker来做,则是根据topic下的分区列表和consumer列表做好负载均衡后推送给consumer。

     3.broker的分区高可用如果用的是leader-follower模式,还需要依赖zookeeper的选举功能选举出leader。kafka早期是依赖zookeeper来实现的这个功能,目前是自己实现的一个算法,叫ISR,简单讲就是允许设置一个topic下的每个分区(partition),在写入的时候同时若干个几个replica,只有都写入成功才算成功,这几个replica叫做in-sync-replica,简称ISR,不是ISR的replica是异步写入的,可是这里面同样面临CAP理论提出的问题,这个ISR多少合适呢?假如有5个副本,同时写5个当然数据的可靠性和一致性能保证,可是可用性就太差了(响应时间慢其实也可以叫做可用性差),假如有5个replica,ISR=3,那么在写入的返回给producer成功之后,3个ISR全部挂掉了怎么办?(虽然概率小,但是有可能)这个时候如果保可用性,当然是让另外两个replica接着选举,接着提供服务,但是数据的一致性就没有了,因为这两个副本里面的状态很有可能不是最新的。如果保一致性,那就只能等待ISR里面的某一个重新活过来,可是如果一直都不活过来呢,那么这个分区就全部不可用了,Kafka默认是保一致性,但是支持用户配置保证可用性,也就是说,不论你采用什么算法,终归是要面对cap的,这才是cap理论的最大贡献。所以掌握上面说的那些分布式的基本理论,对于设计一个分布式系统是非常有用的。rocketmq则是自己实现了一个简单的nameserver,不依赖于zookeeper,当然rocktmq其实没有用到选举功能的,因为它是master-slave模式。kafka作为一款开源的非常优秀的分布式消息队列早期就是完全通过zookeeper来做集群管理的。后期为了减少zookeeper的依赖,有一部分功能自己实现了,比如说各个分区的leader-follower算法。


总结一下:一个分布式系统通常都是对数据进行分区(partition)或者分片(shard),然后每个分区或者分片都有其副本。而分区或者分片,leader-follower的信息是需要统一管理的,协调一致的,这个时候就需要一个分布式协调服务来承担这部分工作了。但是对于一个开源的产品,别人在依赖这个开源产品的同时还需要依赖另一个开源产品,这个其实是不太友好的,所以很多分布式系统都选择自己是实现类似zookeeper的功能,比如redis,mongodb他们也有分片和副本的概念,但是却并没有依赖一个类似zookeeper这样的协调服务,而是自己内部实现了一套这个逻辑。比如说kafka,就是使用kafka的应用在依赖Kafka的同时,还需要依赖zookeeper,对rocketmq来说则是必须依赖nameserver。但是这里有一些共性的东西就是,不论是依赖zookeeper还是自己实现类似的功能,都必须得有一个协调者(coordinator)这个角色,同时也必须要解决,分布式系统共有的问题,cap,base,事务,高可用,横向扩展,数据可靠性等问题。


三.怎么定义可靠?为了达到可靠,需要解决哪些问题?


消息队列作为堆积和分发消息的一个系统,首要保证就是消息的可靠性。简单讲就是消息不能丢,不能重的问题。那么如何保证消息不丢呢?

  1. 消息从producer到broker,这个过程中,可以选择同步也可以异步,同步和异步都可以做重试,也都需要broker返回ack。但是同步相对异步的好处是,同步的话调用方能够里面知道结果,失败了调用方能够立即知道,也就是把失败的情况的处理任务交给了调用方,比如下单,如果是同步,可以直接返回给用户下单失败,请稍后重试等。异步的话,通常是把消息发给了消息队列client(嵌入到了producer进程里面)里面的一个buffer缓存起来,由client异步发送给broker。但是这里确实是有可能丢消息的,比如这个时候producer挂了,或者buffer满了呢。也就是同步相对于异步来说更能保证消息的可靠性,特别是在配置了同步双写或者同步多写的情况下(写入replica或者slave)但是有可能会阻塞应用的线程,所以同步双写可以保证消息投递过程中的可靠性。

  2. broker 拿到消息之后,如果是立即把消息放入磁盘,肯定消息的可靠性就会高很多,但是考虑到性能(可用性)通常不这么做,通常是把消息写入OS pagecache里面,再定时flush到磁盘,因为这样能够避免很多小数据量的IO/write,而且能够保证顺序写,很多存储系统都利用了pagecache来提升读写的性能,但是假如在数据还没有flush到磁盘之前,进程死掉了或者停电了怎么办?这部分数据就丢掉了,所以如果想保证数据的绝对可保,同步刷盘可以保证存储过程中的消息的可靠性。

  3. consumer从broker拉取消息消费。这里面假如consumer从broker拉取消息之后,broker就把消息删掉了,但是broker又消费失败了,那么消息就丢掉了,所以consumer从broker拉取完消息之后需要给broker一个ack,可是仅仅消息到了成功到了consumer端就行了吗?还有可能消息已经到了consumer,但是consumer消费失败,这个时候就需要把消息再次投递到broker,后面进行重试。所以consumer还得告诉broker这个消息被成功消费了,broker才能删除这个消息。kafka和rocketmq都支持回溯消息,也就是哪怕你告诉我成功消费了这个消息我也不删除,直到用户主动删除,或者到了约定的存储时间,或者磁盘空间不足了。通过这两个过程保证了,消费过程中的消息的可靠性。


总结一下:通过同步双写---->同步刷盘----->消费重试的机制可以最大程度的保证消息的可靠性。


至于保证消息不重,我一直的观点都是,这个在分布式,并发情况下,真的很难实现。比如说,同步投递给broker的过程中,假如某个broker处理超时了,但是其实还是把消息存储下来了,这个时候client肯定会进行重试,一旦重试很有可能请求到了另外一台broker,那么另外一台broker也存储了相同的消息,这个时候就会重复了。


这篇文章并没有讨论一个消息队列的具体的实现方式,只是讨论了设计一个可靠的分布式消息队列需要解决的问题,但是对于理解kafka和rocketmq等成熟的消息队列的设计思想是有帮助的。


1.分布式系统定义

http://www.hpcs.cs.tsukuba.ac.jp/~tatebe/lecture/h23/dsys/dsd-tutorial.html

2.事务定义

https://docs.oracle.com/cd/B28359_01/server.111/b28310/ds_txns001.htm#ADMIN12213  oracle官方文档

3.cap

https://dzone.com/articles/understanding-the-cap-theorem

4.Paxos made simple

https://lamport.azurewebsites.net/pubs/paxos-simple.pdf

5.zab

https://www.datadoghq.com/pdf/zab.totally-ordered-broadcast-protocol.2008.pdf










以上是关于如何设计一个可靠的分布式消息队列?的主要内容,如果未能解决你的问题,请参考以下文章

面试官:消息队列中,消息可靠性重复消息消息积压利用消息实现分布式事务如何实现...

消息队列:消息可靠性重复消息消息积压利用消息实现分布式事务

一文弄懂消息队列相关面试问题:消息可靠性重复消息消息积压利用消息实现分布式事务

58分布式消息队列WMB设计与实践

Mark | 分布式之消息队列

分布式之消息队列复习精讲