使用RabbitMQ的事件驱动微服务
Posted 分布式实验室
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了使用RabbitMQ的事件驱动微服务相关的知识,希望对你有一定的参考价值。
在微服务之间使用正确的模式进行通信有助于应用程序的伸缩以及解决大多数分布式系统的问题。我们一开始是采用直接的HTTP调用来通信的,但后来决定迁移到事件驱动系统上了。该系统改变了我们对于服务之间交互的思维方式,迫使我们采用可伸缩的模式并且提高了我们的适应能力。
我们从传统的HTTP通信迁移到基于事件的通信有几个原因,第一是对于服务的强制解耦,从我们对于HTTP的使用经验来看,我们的服务将会对于需要的每个服务都发起调用请求,这意味着原来的服务对于与其通信的每个服务都需要一个客户端库,该客户端库需要确保错误不会停止或者阻止正常的功能,并且与每个服务保持一致。
当我们扩展到超过20个服务时,维护客户端库就成为了一个长期、艰巨的过程。取代旧功能的新服务需要更新所有依赖项。由于所有的这些变动的组件,使得开发和部署的过程更长并更容易出错。
使用事件的另一个好处是服务不再需要编排功能了,消除了对于客户端的直接调用。服务可以自由地进出那些已经存在的地方,而无需更新客户端库或者添加新的HTTP调用。我们可以迅速部署那些监听事件的原型应用程序,而不用担心它们降低整个系统的可用性。
第三点,这个变化允许我们来实现全局模式,我们增加了速率限制和每个工作节点的超时时间,而不用在我们的每一个不同的客户端库(GitHub、AWS、内部服务等)分别实现。我们也能够很容易地实现一个熔断器模式,方式是通过切断一个事件的监听器,直到它健康为止,并且这只针对需要改变的工作节点,而不是所有的服务调用者。
最后,我们并不仅限于为那些长时间运行的工作节点持有一个开放的HTTP连接(可以断开连接或限制打开的套接字等)。
组成事件驱动系统的有两种不同的模式:事件和任务。
事件是当有事发生时告知已订阅的应用程序的那些通知。应用程序订阅某些事件并通过为此创建任务来响应,事件不会直接修改状态。
任务是修改状态的动作,唯一可以为一个给定的应用程序创建一个任务的是应用程序本身。这样的话,应用程序不能直接修改对方的状态。
当遇到事件和任务的命名时,严格的命名约定有助于我们保持一致性和清晰度。任务的命名以应用程序名称开始这样可以确保它们只处理预期的应用程序,接下来是模型,其状态会被任务修改,最后是一个描述性的“现在时”的动词。一个任务的例子是api.user.authorize
,根据约定我们知道这个任务是由api
服务处理的,该任务在user
对象上执行一个authorize
动作。
事件没有应用程序名称,因为它们可以被多个应用程序订阅。它们以模型名字开始,并以描述发生了什么的过去式动词结尾,一个事件的例子就是user.authorized
。
将我们的应用程序分解为任务和事件迫使我们改变了思维方式,以前,如果我们想在收到付款后发送一封电子邮件,我们会添加一个SendGrid的调用到我们的支付服务里,非常简单明了。
但是对于我们崭新的事件系统,我们的支付服务发出了一个事件org.payment.processed
。我们的email服务Pheidi,就会收到事件并创建了一个任务:pheidi.email.send
,我们现在需要按照反应而不是命令来考虑,不过如果我们需要额外的并且事件没有提供的数据(比如信用卡注册名字),我们仍然使用HTTP调用我们的账单服务。
在基于事件的解决方案伴随着优势的同时,还是有一些缺点的,因为你没有显式地调用服务,所以不能确定你发出的事件的响应是什么,这使得调试非常困难,因为系统更为复杂和难以理解了。
我们使用RabbitMQ作为我们的消息系统,负责分发事件到监听它们的服务。任务也会通过RabbitMQ分发,所以可以跨一个应用的多个实例来做负载均衡。我们选择RabbitMQ是因为它易于部署,并且里边有准备就绪的供我们使用的NPM客户端模块。

我们创建了Ponos作为我们的统一的工作节点服务器来与RabbitMQ互动,这里有一些我们用来处理队列的模式。
从一开始我们就在每个job上添加了指数退避算法,如果一个job抛出了一个可重复的错误,那么将会在过一段时间后重试。每个job都从一个最小的时间延迟开始,并且直到达到一个预定义的最大极限值(或者如果没有定义极限的话,那么就是无穷大),就增加一倍。
最初,我们希望job可以永远重试,考虑如果有什么东西被“卡住”的话,我们的警报系统将发出警告,我们中的一员将会成为拯救世界的白马骑士。这一开始会工作得很好,但随着我们添加了更多的工作,在队列中被“卡住”的条目的数量将会由于各种原因增长地很快。
为了应对日益增长的队列,我们给每个队列添加了一个最大重试限制,如果job重试达到了给定的次数,我们会阻止它重试并运行恢复功能。恢复功能会记录并更新带了一个错误的数据库。现在,我们的警报系统将触发恢复功能,使我们能够优先解决问题,而不是将我们的队列进行备份。我们发现,采用快速失败机制并给我们的用户显示错误相对于让用户为这些故障等待很长一段时间而言,是更好的选择。
预读取是RabbitMQ Channel上的一个很重要的设置选项,没有这个,你的工作节点将利用队列里的所有可用的job,举例来说,如果你的应用程序经历了一个尖峰负载并入列了10000个job,所有的这10000个job都会被发送到工作节点上并存入内存里,这通常会导致机器崩溃,预读取限制了你的工作节点可以载入内存的job的数量,这篇来自RabbitMQ的博客文章会帮助我们确定实现预读取的最佳方式。
为了实现事件和任务,我们使用以下的RabbitMQ结构。任务通过sendToQueue
API来使用一个队列,由于任务只使用一个应用程序,我们不用为它们创建一个交换机。事件是一个更复杂的设置。事件的发布者会创建一个fanout交换机,每个订阅者会创建并绑定一个队列到那台交换机,这样允许任何应用程序都可以接收任何事件,而不会影响其他应用程序。
有一样可以帮助我们调试和反躬自省事件系统的东西是事务ID(TID)。每一个发送到RabbitMQ的job都会带上一个TID前缀,如果该job是一个事件或者任务的结果,那么就使用相同的TID,如果job不是从事件或者任务创建而来的,我们就产生一个新的TID。这有助于我们追踪哪些事件引发了哪些要运行的任务。
我们的事件驱动系统加快了我们的开发速度,使我们对于失败更具有弹性,并为我们的用户提高了我们产品的响应能力,我们希望这些技术将会一如既往地有助于你们系统的可伸缩性。
本文为翻译文章,点击阅读原文链接可查看原文。
以上是关于使用RabbitMQ的事件驱动微服务的主要内容,如果未能解决你的问题,请参考以下文章
Chris Richardson微服务翻译:微服务之事件驱动的数据管理