译:基于Spring Cloud Stream构建和测试 message-driven 微服务

Posted softidea

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了译:基于Spring Cloud Stream构建和测试 message-driven 微服务相关的知识,希望对你有一定的参考价值。

原文链接:https://piotrminkowski.wordpress.com/2018/06/15/building-and-testing-message-driven-microservices-using-spring-cloud-stream/ 作者: Piotr Mińkowski 译者: helloworldtang img Spring Boot和Spring Cloud为您提供了一个利用不同的通信方式快速构建微服务的解决方案。您可以基于Spring Cloud Netflix库创建同步REST微服务,正如我在之前的一篇文章中所展示的那样 使用Spring Boot 2.0, Eureka and Spring Cloud快速搭建微服务指南。您可以使用Spring WebFlux项目在Netty上创建异步的、响应式的微服务,并将其与一些Spring Cloud库相结合,如我的文章所示 使用Spring WebFlux and Spring Cloud搭建响应式微服务。最后,您可以使用Spring Cloud Stream和类似Apache Kafka或RabbitMQ这样的broker来实现基于发布/订阅模型的message-driven微服务。构建微服务的最后一种方法是本文的主要主题。我将向您展示如何在RabbitMQ broker的基础上有效地构建、扩展、运行和测试消息传递微服务。 体系结构 为了演示Spring Cloud Stream的特性,我们将设计一个示例系统,该系统使用发布/订阅模型进行跨服务通信。我们有三个微服务:order-service、product-service和account-service。应用程序order-service暴露了负责处理发送到我们系统的订单的HTTP endpoint。所有传入的订单都是异步处理的——order-service准备并发送消息到RabbitMQ exchange,然后就对调用的客户端进行响应,不需要等到消息被消费后再响应。应用程序的account-service和product-service正在侦听进入该RabbitMQ exchange的订单消息。微服务account-service负责检查客户账户是否有足够的资金来支付该订单需要的金额,如果有就从该账户扣款。微服务product-service检查是否有足够的库存,并在处理订单后改变可用产品的数量。account-service 和 product-service 都通过RabbitMQ exchange(这一次是使用direct exchange的一对一通信)发送带有操作状态的异步响应。微服务 order-service根据接收到的响应消息来更新订单状态,并通过REST endpoint GET /order/{id}提供给外部客户端。 如果您觉得我们的示例描述有点难以理解,这里有一个用于澄清的架构图。 stream-1 启用 Spring Cloud Stream 在项目中使用Spring Cloud Stream的推荐方法是使用依赖管理系统。Spring Cloud Stream有一个与整个Spring Cloud framework相关,并且独立发布的依赖管理。然而,如果我们已经在Elmhurst.RELEASE版本的dependencyManagement部分声明了spring-cloud-dependencies,就不需要在pom.xml中声明任何其他内容。如果您喜欢只使用Spring Cloud Stream项目,那么您应该定义以下部分。 下一步是将spring-cloud-streamartifact添加到项目依赖项中。我还建议您至少包括spring-cloud-sleuth 库,以提供作为源请求进入order-service 的发送消息用的traceId。 Spring Cloud Stream 编程模型 为了使您的应用程序能够连接到一个message broker,请在主类上使用@EnableBinding注解。 @EnableBinding注解将一个或多个接口作为参数。您可以在Spring Cloud Stream提供的三个接口之间进行选择: Sink:这是用来标记从入站通道接收消息的服务。 Source: 这是用来向出站通道发送消息的。 Processor:当你需要一个入站通道和一个出站通道时,它可以被使用,因为它继承了Source and Sink接口。因为order-service发送消息,并接收它们,它的主类已经使用了@EnableBinding(Processor.class)注解。 下面是order-service项目中启用了Spring Cloud Stream binding的主类。 @SpringBootApplication @EnableBinding(Processor.class) public class OrderApplication { ... public static void main(String[] args) { new SpringApplicationBuilder(OrderApplication.class).web(true).run(args); } ... } 增加 message broker 在Spring Cloud Stream术语中,负责与特定message broker集成的实现称为binder。默认情况下,Spring Cloud Stream为 Kafka and RabbitMQ提供了binder实现。它能够自动检测和在类路径上查找binder。任何特定于中间件的设置都可以通过Spring Boot支持的外部配置属性来覆盖,譬如应用程序参数、环境变量,或者仅仅是application.yml文件。为了包含对RabbitMQ的支持,RabbitMQ将这篇文章用作message broker,您应该向项目添加以下依赖项。 现在,我们的应用程序需要连接RabbitMQ broker的一个共享实例。这就是为什么我使用RabbitMQ在默认的5672端口上运行Docker镜像。它还可以在地址http://192.168.99.100:15672(http://192.168.99.100:15672/)下启动web仪表板。 我们需要通过设置属性 spring.rabbitmq.host为Docker机器IP 192.168.99.100 ,来覆盖Spring Boot application的中的默认设置。 实现消息驱动的微服务 Spring Cloud Stream是在Spring Integration项目之上构建的。Spring Integration扩展了Spring编程模型,以支持众所周知的企业集成模式(EIP)。EIP定义了许多在分布式系统中经常使用的经典组件。您可能已经听说过诸如消息通道、路由器、聚合器或endpoints之类的模式。让我们回到上面的例子。让我们从order-service开始,它负责接收订单,将它们发布在shared topic上,然后从下游服务收集异步响应。下面是@service,它使用Sourcebean来构建消息并将其发布到远程topic。 这个 @Service 是由controller调用,controller暴露提交新订单和通过 id获得订单状态的HTTP endpoints。 现在,让我们更仔细地看看消费端。来自order-service的OrderSender bean所发送的消息是由 account-service和product-service接收。为了从 topic exchange中接收消息,我们只需要在入参为Order的方法上添加 @StreamListener注解。我们还必须为监听器定义目标通道——在这种情况下,它是Processor.INPUT。譬如: @StreamListener(Processor.INPUT) public void receiveOrder(Order order) throws JsonProcessingException { LOGGER.info("Order received: {}", mapper.writeValueAsString(order)); service.process(order); } 接收订单由AccountServicebean处理。account-service会根据客户账户上是否有足够的资金来实现订单接受或拒绝订单。验收状态的响应通过OrderSenderbean调用的输出通道发回order-service 。 最后一步是配置。它是在 application.yml中提供的。我们必须正确地定义通道的destination。而order-service则将orders-outdestination分配给输出通道,而orders-indestination则是输入通道,account-service和 product-service则恰恰相反。这是合乎逻辑的,因为通过其输出destination通过 order-service发送的消息是通过其输入destination接收的服务接收的。但在shared broker’s exchange中,它仍然是相同的destination。下面是 order-service的配置设置。 spring: application: name: order-service rabbitmq: host: 192.168.99.100 port: 5672 cloud: stream: bindings: output: destination: orders-out producer: partitionKeyExpression: payload.customerId partitionCount: 2 input: destination: orders-in rabbit: bindings: input: consumer: exchangeType: direct 这是为 account-service和product-service提供的配置。 最后,您可以运行上面示例中的微服务。现在,我们只需要运行每个微服务的单个实例。您可以通过运行JUnit测试类OrderControllerTest来轻松地生成一些测试请求,这是在我的源代码库中提供的 order-service中提供的。这种情况下很简单。在下一篇文章中,我们将学习更高级的示例,其中包含多个正在运行的消费服务实例。 扩展 为了扩展我们的Spring Cloud Stream应用程序,我们只需要启动每个微服务的附加实例。他们仍然会侦听与当前正在运行的实例相同的 topic exchange 中的传入消息。在添加了一个 account-service和 product-service的实例之后,我们可以发送一个测试订单。这个测试的结果对我们来说是不令人满意的… 为什么?每个微服务运行的所有实例都接收到了这个订单。这正是 topic exchanges 的工作方式——发送到topic的消息被所有的消费者接收,他们正在侦听这个topic。幸运的是,Spring Cloud Stream能够通过提供称为 consumer group的解决方案来解决这个问题。它负责保证一个消息只被一个实例处理,如果它们被放置在一个相互竞争的消费者关系中。在运行多项服务实例时,对consumer group机制的转换已经在下图中可视化了。 stream-2 一个 consumer group 机制的配置不是很困难。我们只需要设定group参数,并给出给定destination的组名。下面是account-service的当前binding配置。orders-indestination地是一个为直接与order-service通信而创建的队列,因此只有orders-out被分组使用spring.cloud.stream.bindings..group属性。 Consumer group机制是Apache Kafka的一个概念,它也在Spring Cloud Stream中实现,也适用于RabbitMQ broker,它本身并不支持它。因此,我认为它在RabbitMQ上的配置非常有趣。如果您在destination运行两个服务实例,而没有在destination设置组名,那么就会有两个为单个交易所创建的bindings(每个实例一个bindings),如下图所示。因为有两个应用程序在这个exchange中监听,总共有四个binding分配给那个exchange。 stream-3 如果您为选定的destination Spring Cloud Stream设置组名,则将为给定服务的所有运行实例创建单一binding。binding的名称将以组名为后缀。 B08597_11_06 因为,我们已经在项目依赖项中包含了 spring-cloud-starter-sleuth ,在实现 order-service POST endpoint的单个请求时,在交换的所有异步请求之间发送相同的 traceId 头部。由于这个原因,我们可以使用Elastic Stack (Kibana)轻松地将所有日志关联起来。 B08597_11_05 自动化测试 您可以轻松地测试您的微服务,而不需要连接到message broker。要实现它,您需要将 spring-cloud-stream-test-support包含到您的项目依赖项中。它包含 TestSupportBinderbean,它允许您与绑定通道进行交互,并检查应用程序发送和接收的任何消息。 在测试类中,我们需要声明 MessageCollectorbean,它负责接收由TestSupportBinder保留的消息。这是我的account-service测试类。使用Processorbean,我将测试订单发送到输入通道。然后,MessageCollector接收到通过输出通道发送回 order-service 的消息。测试方法的 testAccepted创建了应该被帐户服务接受的顺序,而testRejected方法则设置了过高的订单价格,从而导致拒绝订单。 总结 当您不需要来自API的同步响应时,Message-driven的微服务是一个不错的选择。在本文中,我展示了在您的微服务之间的跨服务通信中发布/订阅模型的示例用例。源代码在GitHub上是常见的(https://github.com/helloworldtang/sample-message-driven-microservices.git【原文源码maven不能运行,这个项目fork原代码并修复了错误】)。对于使用Spring Cloud Stream库、Apache Kafka的更有趣的例子,您可以参考我的书中第11章, Mastering Spring Cloud(https://www.packtpub.com/application-development/mastering-spring-cloud)。 关注社区公号,加入社区纯技术微信群

以上是关于译:基于Spring Cloud Stream构建和测试 message-driven 微服务的主要内容,如果未能解决你的问题,请参考以下文章

一文了解Spring Cloud Stream体系

Spring Cloud Stream

Spring Cloud Stream 进行服务之间的通讯

Spring Cloud Stream集成

Spring Cloud Stream集成

Spring Cloud Stream集成