Spring Cloud Stream 编程模型的基础知识,很多老司机都不知道
Posted jinggege795
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spring Cloud Stream 编程模型的基础知识,很多老司机都不知道相关的知识,希望对你有一定的参考价值。
高级编程模型
Spring Cloud Stream 编程模型的基础知识与点对点和发布/订阅通信的示例将一起介绍。现在来讨论一些更高级的示例功能。
制作消息
在介绍的所有示例中,我们已通过RESTful API 发送订单以进行测试。但是,开发人员也可以通过在应用程序内定义消息源来轻松创建一些测试数据。以下是一个使用@Poller的bean,它将每秒生成一条消息, 并将其发送到输出通道。
@Bean
@InboundChannelAdapter (value一Source .OUTPUT, poller - CPoller (fixedDelay -
"1000",maxMessagesPerPoll = "1"))
public MessageSource<order> ordersSource () (
Random r = new Random();
return () -> new GenericMessage<> (new order (Orderstatus . NEW, (1ong)
r.nextInt(5), Collections.singletonList( (1ong) r.nextInt(10) ) ) ) ;
}
转换
如前文所述,account-service 服务和product-service服务已经从order-service服务接收事件,然后发回响应消息。我们创建了OrderSender bean,它负责准备响应有效负载并将其发送到输出通道。事实证明,如果在方法中返回响应对象并使用@SentTo注解它,则实现可能会更简单。
@StreamListener (Processor . INPUT)
@SendTo (Processor。OUTPUT)
public Order rece iveAndSendorder (Order order) throws
JsonProcessingException {
LOGGER. info("Order received: ( }", mapper .writeValueAsstring (order));
return service.process (order) ;
}
我们甚至可以想象诸如以下形式的实现,而不使用@StreamL istener。转换器(Transformer)模式将负责更改对象的形式。在这种情况下,它会修改两个order字段一status (状态)和price (价格)。
@EnableBinding (Processor.class)
public class OrderProcessor {
@Transformer (inputChannel = Processor. INPUT, outputChannel =
Processor. OUTPUT)
public Order process (final Order order) throws JsonProcessingException (
LOGGER. info ("Order processed: ()", mapper .writeValueAsString (order));
products. forEach(p -> order . setPrice (order .getPrice() +
p.getPrice()));
if (order.getPrice() <- account .getBalance()) {
order。setStatus (OrderStatus。ACCEPTED) ;
account.setBalance (account .getBalance() - order .getPrice());
else|
order。setStatus (OrderStatus . REJECTED) ;
return order;
}
}
有条件地使用消息
假设开发人员希望以不同方式处理传入同一消息通道的消息,则可以使用条件分派。Spring Cloud Stream支持根据条件将消息分派给在输入通道上注册的多个@StreamListener方法。该条件是在@StreamListener注解的condition 属性中定义的Spring 表达式语言(Spring Expression Language, SpEL) 表达式。
public boolean send(Order order) {
Message<Order> orderMessage =
MessageBuilder . withPayload (order) .build() ;
orde rMessage . getHeaders () . put ("processor", "account") ;
return this. source. output ()。send (orderMessage) ;
}
以下就是一个示例实现,它定义了两个使用@StreamListener注解的方法,这些方法侦听同一主题。其中一个专用于从account-ervice 服务传入的消息,而第二个则专用于product-service服务。传入消息将根据带有processor名称的标头进行分派。
@SpringBootApplication
@EnableDi scoveryClient
@EnableBinding (Processor.class)
public class OrderApplication {
@StreamListener (target = Processor . INPUT, condition =
"headersI'processor']=-'account'")
public void receiveorder (Order order) throws JsonProcessingException
LOGGER. info ("Order received from account: { } ",
mapper.writeValueAsString(order) );
// ...
}
@StreamListener (target一Processor. INPUT, condition =
"headers[ 'processor']--'product'")
public void receiveOrder (Order order) throws JsonProcess ingException
LOGGER. info("Order received from product: { }",
mapper . writeValueAsString (order) );
// ...
}
}
使用Apache Kafka
在讨论Spring Cloud与消息代理的集成时,我们曾经多次提到过Apache Kafka。 但是,到目前为止,我们还没有基于该平台运行任何示例。事实上, RabbitMQ在使用SpringCloud项目时往往是首选,但是Kafka也值得我们关注。与RabbitMQ相比,它的一个优势是对分区的原生支持,而分区正是Spring Cloud Stream最重要的功能之一。
Kafka不是典型的消息代理。它是一个分布式流媒体平台。它的主要功能是允许开发人员发布和订阅记录(Record)流。它对转换或响应数据流的实时流应用程序特别有用。它通常作为由一个或多个服务器组成的集群运行,并可以在主题中存储记录流。
运行Kafka
糟糕的是,Apache Kafka没有正式的Docker镜像。但是,我们可以使用一个非官方的,如Spotify共享的镜像。与其他可用的Kafka docker镜像相比,这个镜像可以在同一容器中运行Zookeeper和Kafka。以下是启动Kafka并在端口9092.上公开它的Docker命令。在端口2181上也可以使用Zookeeper.
docker run -d --name kafka -P 2181:2181 -P 9092:9092 --env
ADVERTISED HOST=192 .168.99.100 --env ADVERTISED PORT=9092 spotify/kafka
自定义应用程序设置
要为应用程序启用Apache Kafka, 需要将spring- cloud-starter- stream-kafka启动程序包含在依赖项中。我们当前的示例非常类似于发布/订阅的示例,因为它使用了RabbitMQ发布/订阅,以及在前一篇“发布/订阅模型”中介绍过的分组和分区机制。唯一的区别在于依赖项和配置设置。
SpringCloudStream将自动检测并使用类路径中找到的绑定器。可以使用spring kafka.*属性覆盖连接设置。在这种情况下中,只需要将自动配置的Kafka客户端地址更改为Docker 机器地址192.168.99.100。 对Zookeeper 也应该执行相同的修改,因为Zookeeper将由Kafka客户端使用。
spring:
application:
name: order-service
kafka:
bootstrap-servers: 192.168.99.100:9092
cloud:
stream:
bindings:
output :
destination: orders-out
producer:
partitionKeyExpression: payload. customerId
partitionCount: 2
input:
destination: orders-in
kafka:
binder:
zkNodes: 192.168.99.100
在启动发现、网关和所有必需的微服务实例后,即可执行与先前示例相同的测试。如果一切配置正确,则应该在应用程序启动期间在日志中看到以下片段。其测试结果与基于RabbitMQ的示例完全相同。
16:58:30.008 INFO [,] Discovered coordinator 192.168. 99.100:9092
(id: 2147483647 rack: null) for group account.
16:58:30.038 INFO [,] Successfully joined group account with generation 1
16:58:30.039 INFO [,] Setting newly assigned partitions
[orders-out-0, orders-out-1] for group account
16:58:30.081 INFO [,] partitions assigned:
[orders-out-0, orders-out-1]
Kafka Streams API支持
Spring Cloud Stream Kafka可以提供专为Kafka Streams绑定设计的绑定器。使用此绑定器之后,应用程序即可利用Kafka Streams API。要为应用程序启用此类功能,需要在项目中包含以下依赖项。
<dependency>
<groupId>org . springframework. cloud</groupId>
<artifactId>spring-cloud-stream-binder- kstream</artifactId>
</dependency>
Kafka Streams API可以提供高级流DSL。可以通过声明@StreamListener方法将KStream接口作为参数来访问它。KStream为流操作提供了一些有用的方法,这些方法来自其他流API,如map、flatMap、join 或filter。还有一些 与Kafka Stream相关的其他方法,如t.(... (用于向主题发送流)或through..(与to相同,但也会从主题创建KStream的新实例) 。
@SpringBootApplication
@EnableBinding (KStreamProcessor .class)
public class AccountApplication {
@StreamListener ("input")
@SendTo ("output")
public KStream<?, order> process (KStream<?, Order> input) {
// ..
}
public static void main(String[] args) {
SpringApplication. run (AccountApplication.class, args);
}
}
配置属性
在讨论示例应用程序的实现之前,我们已经介绍了Kafka的一些 Spring Cloud配置设置。表11.5包含了一些最重要的属性,可以设置这些属性来自定义Apache Kafka绑定器。所有这些属性都以spring .cloud.stream. kafka.binder为前缀。
多个绑定器
在Spring Cloud Stream术语中,可以实现以提供与外部中间件的物理目标的连接的接口称为绑定器(Binder)。目前,有两种可用的内置绑定器实现一Kafka 和RabbitMQ。如果想要提供自定义绑定器库,那么关键接口就是Binder (这个关键接口其实就是作为将输入和输出连接到外部中间件的策略的抽象),它有两个方法一bindConsumer 和bindProducer。有关更多详细信息,请参阅Spring Cloud Stream规范。
对开发人员来说,重要的是能够在单个应用程序中使用多个绑定器。我们甚至可以混合使用不同的实现,如RabbitMQ和Kafka. Spring Cloud Stream依赖于Spring Boot在绑定过程中的自动配置。类路径上可用的实现将自动使用。如果想要同时使用默认的绑定器,请在项目中包含以下依赖项。
<dependency>
<groupId>org。spr ingframework. cloud</groupId>
<artifactId>spring-cloud-st ream-binder- rabbit</artifactId>
</ dependency>
<dependency>
<groupId>org. spr ingframework. cloud</groupId>
<artifactId> spring-cloud-stream-binder- kafka</artifactId>
</dependency>
如果在类路径中找到了多个绑定器,则应用程序必须检测应将哪个绑定器用于特定通道绑定。我们可以使用spring .cloud. stream defaultBinder属性全局配置默认绑定器,或者使用spring.cloud stream. bindings.<channelName> .binder属性为每个通道单独配置默认绑定器。现在不妨回到之前的示例,在那里配置多个绑定器。我们需要为account-service服务和order. service服务之间的直接通信定义RabbitMQ,并为order-service 服务和其他微服务之间的发布/订阅模型定义Kafka。
以下是与publish subscribe 分支( hts://github. com/piomin/sample-spring-cloud-messaging/tree/publish subscribe) 中的account-service相同的配置,但它基于两个不同的绑定器。
spring:
cloud:
stream:
bindings:
output:
destination: orders-in
binder: rabbit1
input :
consumer :
partitioned: true
destination: orders -out
binder: kafkal
group:. account
rabbit:
bindings:
output:
producer :
exchangeType: direct
rout ingKeyExpression: ' “#”'
binders:
rabbit1 :
type: rabbit
envi ronment :
spring:
rabbi tmq:
host: 192.168.99.100
kafkal :
type: kafka
envi ronment :
spring:
kafka:
bootstrap-servers: 192. 168.99.100:9092
小结
与所有其他Spring Cloud项目相比,Spring Cloud Stream可以被视为一个单独的类别。它通常与其他项目相关联,并且目前由Pivotal Spring Cloud Data Flow强力推广。这是用于构建数据集成和实时数据处理管道的工具包。当然,这也是一个巨大的主题,不是单独的一本书就可以讨论完的。
更重要的是,Spring Cloud Stream 支持异步消息传递,并且可以使用Spring注解样式轻松实现。我们认为,对于部分开发人员来说,这种服务间通信方式并不像RESTfulAPI模型那么明显。因此,我们更专注于展示使用Spring Cloud Stream进行点对点和发布/订阅通信的示例。我们还详细介绍了这两种消息传递方式之间的差异。
发布/订阅模型并不是什么新鲜事物,但是由于Spring Cloud Stream的存在,它可以很容易地包含在基于微服务的系统中。本章还介绍了一些关键概念,如使用者分组或分区。阅读完本章之后,开发人员应该能够基于消息传递模型实现微服务,并将它们与其他Spring Cloud库集成,以便提供日志记录、跟踪或仅将它们部署为现有的基于REST的微服务系统的一部分。
觉得文章不错的朋友可以转发此文关注小编,有需要的可以扫码下方获取资料。
以上是关于Spring Cloud Stream 编程模型的基础知识,很多老司机都不知道的主要内容,如果未能解决你的问题,请参考以下文章
Spring Cloud Stream Supplier 功能模型
Spring Cloud Stream 3.1.x版本,弃用@StreamListener而采用函数式编程实现RocketMQ的接入
Spring Cloud Stream 3.1.x版本,弃用@StreamListener而采用函数式编程实现RocketMQ的接入