精通springcloud:消息驱动的微服务,了解Spring Cloud Stream
Posted jinggege795
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了精通springcloud:消息驱动的微服务,了解Spring Cloud Stream相关的知识,希望对你有一定的参考价值。
消息驱动的微服务
我们已经讨论了Spring Cloud 提供的是基于微服务架构的许多功能。但是,我们一直在考虑的其实都是基于RESTful的同步通信服务。在“微服务简介”中曾经提到过,还有其他一些流行的通信方式,如发布/订阅或异步、事件驱动的点对点消息传递等,后者就是本章将要介绍的一种微服务实现方法,它和前面章节所介绍的基于RESTful的同步通信服务有所不同。
本章还将详细讨论如何使用SpringCloudStream来构建消息驱动的微服务。
本章将要讨论的主题包括:
口与Spring Cloud Stream相关的主要术语和概念。
口使用RabbitMQ和Apache Kafka消息代理作为绑定器。
口Spring Cloud Stream编程模型。
口绑定、生成器和使用者的高级配置。
口扩展、分组和分区机制的实现。
口多个绑定器支持。
了解Spring Cloud Stream
Spring Cloud Stream构建于Spring Boot之上。它允许开发人员创建独立的、生产级的Spring应用程序,并使用Spring Integration来帮助实现与消息代理的通信。使用SpringCloud Stream创建的每个应用程序都可以通过输入和输出通道与其他微服务集成。这些通道通过与中间件相关的绑定器( Binder)实现连接到外部消息代理。有两种内置的绑定器实现一Kafka 和Rabbit MQ.
Spring. Integration 扩展了Spring 编程模型,以支持众所周知的企业集成模式(Enterprise Integration Pttens, EIP)。EIP定义了许多通常用于分布式系统中的协作的组件。读者可能已经听说过消息通道、路由器、聚合器或端点等模式。Spring Integration框架的主要目标是提供一个基于EIP构建Spring应用程序的简单模型。如果读者对有关EIP的更多详细信息感兴趣,请访问网站ht:/www enterpriseintegrationpatterms com/patterns/messaging/toc .html.
构建消息传递系统
我们认为引入主要Spring Cloud Stream功能的最合适方式是通过基于微服务的示例系统。我们将轻松修改前面章节中讨论过的系统架构。这里不妨简要回顾一下这种架构。我们的系统负责处理订单。它由4个独立的微服务组成。order-service 微服务首先与product-service服务进行通信,以便收集所选产品的详细信息,然后通过customer-service服务来检索有关客户及其账户的信息。现在,发送到order-service服务的订单将被异步处理。此时仍有一个公开的RESTful HTTP API端点用于客户端提交新订单,但应用程序不会处理它们。它只保存新订单,将其发送到消息代理,然后给客户端发送响应,表示订单已被批准处理。当前讨论的示例的主要目标是显示点对点通信,因而消息将仅由一个应用程序(account-service 服务)接收。图11.1说明了这个示例系统的架构。
收到新消息之后,account-service服务会调用product-service公开的方法以查找其价格。它从账户中提取资金,然后将响应发送回order-service服务(包含当前订单的状态)。该消息也通过消息代理发送。order-service 微服务将接收消息并更新订单状态。如果外部客户端想要检查当前订单状态,它可以调用公开find方法的端点,查找订单的详细信息。该示例应用程序的源代码可在GitHub ( htp:itb co/inin/sampl-spring- cloud-messaging.git)上获得。
启用Spring Cloud Stream
在项目中包含Spring Cloud Stream的推荐方法是使用依赖项管理系统。Spring CloudStream具有与整个Spring Cloud 框架相关的独立版本列车管理。但是,如果在dependencyManagement部分的Edgware.RELEASE版本中声明了spring-cloud-dependencies,那么就不必在porm.xml中声明任何其他内容。如果开发人员只想使用Spring Cloud Stream项目,则应定义以下部分。
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-st ream-dependencies</artifactId>
<version>Ditmars。SR2</version>
<type>pom</type>
<scope> import</scope>
</dependency>
</dependencies>
</ dependencyManagement>
下一步是将spring-cloud-steam添加到项目依赖项中。此外,建议开发人员至少包含spring- cloud-sleuth库,以提供发送消息功能和traceld,这个traceld与通过Zuul网关传入order- service服务的源请求的traceld相同。
<dependency>
<groupId>org . springframework. cloud</groupId>
<artifact Id>spring-cloud-stream</artifactId>
</ dependency>
<dependency>
<groupId>org. springframework. cloud</groupId>
cartifactId>spring-cloud-sleuth</artifactId>
</dependency>
要为应用程序启用与消息代理的连接,请使用@EnableBinding注解主类。
@EnableBinding注解可以将一个或多 个接口作为参数。可以在Spring Cloud Stream提供
的3个接口之间选择。
口Sink: 用于标记从入站通道接收消息的服务。
口Source:用于向出站频道发送消息。
口Processor:可用于需要入站通道和出站通道的情况,因为它扩展了Source 和Sink接口。由于order-service 服务发送消息以及接收消息,因而其主类已使用@EnableBinding ( Prossosrcass)进行注解。
以下是支持Spring Cloud Stream绑定的主要order-service服务类。
@SpringBootApplication
@EnableDiscoveryClient
@EnableBinding (Processor.class)
public class OrderApplication {
public static void main(String[] args) {
new
SpringApplicationBuilder (OrderApplication.class) .web(true) .run (args);
}
}
声明和绑定频道
由于使用了Spring Integration,因而该应用程序独立于项目中包含的消息代理实现。Spring Cloud Stream将自动检测并使用类路径中找到的绑定器,这意味着开发人员可以选择不同类型的中间件,并配合相同的代码使用。所有与中间件相关的设置都可以通过外部配置属性覆盖,并且采用Spring Boot 支持的形式,如应用程序参数、环境变量或application.yml文件。
如前文所述,Spring Cloud Stream为Kafka和Rabbit MQ提供了绑定器实现。要包含对Kafka的支持,请将以下依赖项添加到项目中。
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream- kafka</artifactId>
</ dependency>
就个人而言,笔者更喜欢RabbitMQ,本章将为RabbitMQ和Kafka各创建一个示例。 由于前面的章节已经讨论过RabbitMQ的功能,所以现在就先从基于RabbitMQ的示例开始。
<dependency>
<groupId>org. spr ingf ramework.cloud</groupId>
<artifactId> spring-cloud-starter -stream rabbit</artifactId>
</dependency>
启用Spring Cloud Stream并包含绑定器实现后,即可创建发送消息者(Sender) 和侦听消息者(Listener) 。现在可以从负责向代理发送新订单消息的生产者(Producer) 开始。这是通过order-service中的OrderSender 实现的,它使用Output bean发送消息。
@Service
public class OrderSender {
@Autowired
private Source source;
public boolean send (Order order)
return
this.source.output() .send (MessageBuilder .withPayload (order) .build( ) );
}
}
该bean由控制器调用,它公开允许提交新订单的HTTP方法。
@RestController
public class OrderController {
private static final Logger LOGGER =
LoggerFactory.getLogger (OrderController.class);
private ObjectMapper mapper new ObjectMapper();
@Autowired
OrderRepository repository;
@Autowired
OrderSender sender ;
@PostMapping
public order process (0RequestBody order order) throws
JsonProcess ingException (
Order O = repository.add(order);
LOGGER.info("Order saved: }", mapper .writeValueAsString (order));
boolean isSent 一sender 。send(o) ;
LOGGER. info("order sent:{ } ",
mapper.writeValueAsstring (Collections.singletonMap ("issent", isSent) ) );
return o;
}
}
包含订单信息的消息已发送到消息代理。现在,它应该通过account-service服务接收。要完成这一操作, 必须声明接收器,接收器将侦听传入队列的消息,这个消息是在消息代理上创建的。要接收带有订单数据的消息,只需要使用@Sreaml istener注解让该方法采用Order对象作为参数。
@SpringBootApplication
@EnableDiscoveryClient
@EnableBinding (Processor.class)
public class AccountApplication {
@Autowired
AccountService service;
public static void main(String[] args) {
new
SpringAppl icationBuilder (AccountApplication.class) .web (true) . run(args);
}
@Bean
astreamListener (Processor。INPUT)
public void receiveOrder (Order order) throws JsonProcessingException {
service.process (order);
}
}
现在可以启动示例应用程序。但是,这里还有一个尚未提及的重要细节。这两个应用程序都尝试连接在localhost上运行的RabbitMQ,并且它们都将相同的交换( Exchange )信息视为输入或输出。这是一个问题,因为order-service服务将消息发送到输出交换信息,而account-service服务又将侦听传入其输入交换消息。这些是不同的交换信息,但先者恒先。接下来不妨就从运行消息代理开始。
觉得文章不错的朋友可以转发此文关注小编,有需要的可以扫码下方获取;
以上是关于精通springcloud:消息驱动的微服务,了解Spring Cloud Stream的主要内容,如果未能解决你的问题,请参考以下文章
Spring Cloud构建微服务架构 消息驱动的微服务(消费分区)Dalston版
第十章 消息驱动的微服务: Spring Cloud Stream