Spring Cloud Stream绑定器架构解析与开发
Posted 炒栗子不加糖
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spring Cloud Stream绑定器架构解析与开发相关的知识,希望对你有一定的参考价值。
Spring Cloud Stream绑定器架构解析与开发
根据不同的使用场景我们通常会选择相适应的消息中间件,例如对于日志收集场景可能会选择使用Kafka,对于订单场景通常会选择RocketMQ,不同消息中间件的客户端是不同的,我们需要针对不同消息中间件写不同的代码。对于TOC的产品,我们可以根据业务场景选择适当的消息中间件,但对于TOB的产品,不同客户对于消息中间件需求是不同的,我们需要方便的切换消息中间件,并且不需要重构代码
Spring Cloud Stream能很好的帮我们解决上面的问题。通过Binder解藕具体消息中间件,屏蔽不同消息中间件使用的差异,基于Binder Spring Cloud Stream提供统一的配置及使用方式,用于发送消息、消费消息。对于使用方而言,只需要了解如何使用Spring Cloud Stream发送、接收消息无需关心使用那种消息中间件。Binder则需要负责不通消息中间件如何去发送、接收消息及其它的高级特性,例如事务等。因此我们可以看到针对不通消息中间件的Binder,例如spring-cloud-stream-binder-kafka、spring-cloud-stream-binder-rabbit等,我们自己也可以根据规范开发符合自己需求的Binder,例如对于消息量不大的用户,可以使用基于redis的消息中间件,我们可以开发一个对应的Binder,对接到Spring Cloud Stream。下面我将介绍Binder的架构及如何开发一个Binder
Binder架构介绍及源码解析
从源码角度,我们来看Spring Cloud Stream是如何与Binder交互,Binder具体又干了什么事情
项目启动
项目启动会先去扫描项目下所有依赖的绑定器,并将绑定器信息存储到内存对象中,具体而言做了如下事情:
-
扫描META-INF/spring.binders,获取绑定器类型及对应配置加载类,用于创建BinderTypeRegistry对象
-
根据BinderTypeRegistry构建BinderFactory。BinderFactory既包含绑定器的信息,也包含扫描yaml文件下配置的spring cloud steam相关的配置信息
发送消息
项目启动之后,准备工作已经完成,我们下面从发送消息流程来看,Spring Cloud Stream如果与Binder协作,完成消息发送
BinderFactory获取绑定器实例
启动项目时,我们将绑定器的信息都放到了BinderFactory中,因此发送消息的第一步就是要根据绑定器的信息,创建绑定器实例,完成绑定器的初始工作
加载绑定器配置类
Spring Cloud Stream通过DefaultBinderFactory的initializeBinderContextSimple方法,加载绑定器对应的配置类,并创建AnnotationConfigApplicationContext
上下文对象
加载绑定器对应配置类之后将触发绑定器的初始化
- 初始化Binder配置类、初始化Binding配置类
- 初始化绑定器目标提供者
- 初始化通道绑定器
获取绑定器实例
通过上步创建的上下文对象即可获取绑定器实例,将获取的绑定器实例缓存在内存中
使用绑定器将输出通道与具体消息中间件绑定
输出通道是Spring Cloud Stream的概念,我们需要将输出通道关联到具体的发送消息客户端,而绑定器实例有构造发送消息客户端的方法。因此这一步需要创建具体的发送消息客户端,并将其关联到输出通道,具体情况如下:
- 获取扩展生产者配置信息并与Spring Cloud Stream 生产者属性合并
- 获取生产者发送目标提供者
- 创建发送消息实例,并完成发送消息对象初始化
- 将输出通道与发送消息实例绑定
- 创建binding并缓存到发送通道,用于之后解除绑定等操作
通过发送通道发送消息
完成发送通道与具体发送客户端绑定之后就可以发送消息,在下面方法中将使用具体消息中间件的客户端完成消息发送
总结
通过上述源码解析,我们可以清楚的看到,Spring Cloud Stream为用户提供了统一的发送消息、消费消息、及配置参数的管理。但在具体执行发送、消费时,则是由绑定器创建的生产者实例、消费者实例完成的。这些由绑定器创建生产者实例、消费者实例是与具体消息中间件关联的,因此不通的消息中间件会对应不同的绑定器。绑定器作为了Spring Cloud Steam与具体消息中间件的桥梁
开发实例
下面将具体说明我们怎样去写一个自己的绑定器
创建spring.binders
创建resources/META-INF/spring.binders,指名绑定器类型及对应的配置类
创建配置类
配置类需要扫描我们扩展的属性类,包括绑定器的配置参数类、Binding的配置参数类,注册目标提供者对象、消息通道绑定器对象
绑定器参数类
绑定器参数类用于存放与具体消息中间件连接相关的信息,例如地址、用户名、密码、端口等信息
Binding扩展属性配置类
这里包括生产者配置属性及消费者配置属性。对于发送、消费时需要的特殊参数可以放到这里,例如使用redis作为消息中间件,我们需要指定发送、消费的
DB是多少,因此需要将此参数放到生产、消费配置类中
-
创建RedisMQExtendedBindingProperties并继承AbstractExtendedBindingProperties指定消费属性类、发送消息属性类、Binding属性类
-
创建RedisMQConsumerProperties消费消息配置类
-
创建RedisMQProducerProperties发送消息配置类
-
创建RedisMQBindingProperties配置类,需要实现BinderSpecificPropertiesProvider。其中的属性既是消费消息属性类及发送消息属性类
提供者
参数处理完成后,我们需要创建一个提供者。提供者记录了我们需要发送或消费消息的topic,以及分区。提供者需要实现ProvisioningProvider
消息通道绑定器
消息通道绑定器需要提供创建发送消息实例、消费消息实例、获取扩展生产者属性类、扩展消费者属性类及Binding属性类的Class。继承AbstractMessageChannelBinder实现ExtendedPropertiesBinder
具体生产者实现
具体生产者实现类,会完成具体的消息连接初始化工作、具体的发送消息等操作。需要继承AbstractMessageHandler,实现Lifecycle
具体消费者实现
完成具体消息消费、停止消息消费,需要继承MessageProducerSupport
用于 Spring Cloud 的 Kafka 绑定器不使用事务回滚
【中文标题】用于 Spring Cloud 的 Kafka 绑定器不使用事务回滚【英文标题】:Kafka Binders for Spring Cloud not rolling back with Transactional 【发布时间】:2021-09-19 04:00:08 【问题描述】:我正在尝试通过 Spring Cloud 在事务中向 Kafka 中的两个单独主题发送消息。在第一条和第二条消息之间抛出异常时,第一条消息仍然出现在第一个主题的消费者中,表明消息没有被回滚。这是我的代码:
@Configuration
@EnableTransactionManagement
public class KafkaChannelTester implements CommandLineRunner
ChannelHolder channelHolder;
MessageChannel messageChannel1;
MessageChannel messageChannel2;
public KafkaChannelTester(ChannelHolder channelHolder)
this.channelHolder = channelHolder;
this.messageChannel1 = channelHolder.messageChannel1();
this.messageChannel2 = channelHolder.messageChannel2();
@Override
public void run(String... args) throws Exception
transactionFail();
public void throwException() throw new RuntimeException();
@Transactional
public void transactionFail()
Message<String> message1 = MessageBuilder
.withPayload("Test-transaction-fail-"+ LocalDateTime.now())
.build();
Message<String> message2 = MessageBuilder
.withPayload("Test-transaction-fail-"+ LocalDateTime.now())
.build();
messageChannel1.send(message1);
throwException();
messageChannel2.send(message2);
@Bean
public PlatformTransactionManager transactionManager(BinderFactory binders)
ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder(null,
MessageChannel.class)).getTransactionalProducerFactory();
System.out.println(pf.transactionCapable());
System.out.println(pf.getTransactionIdPrefix());
KafkaTransactionManager tm = new KafkaTransactionManager<>(pf);
return tm;
application.yml 包含以下内容:
spring:
cloud:
stream:
bindings:
cloud-producer-1:
destination:
peter.cloud.test.1
cloud-producer-2:
destination:
peter.cloud.test.2
kafka:
binder:
brokers:
- testkbroker:9092
transaction:
transaction-id-prefix: transaction-1-
producer:
configuration:
enable.idempotence: true
retries: 1
acks: all
transactionManager 中的打印语句确认生产者工厂确实具有事务 id 前缀,并且具有事务能力。我该怎么做才能使交易正常进行?
【问题讨论】:
【参考方案1】:记录总是被写入日志,即使它们被回滚。默认情况下,消费者会看到回滚记录,您必须将消费者属性isolation.level
设置为read_committed
以避免获得回滚记录。
https://kafka.apache.org/documentation/#consumerconfigs_isolation.level
【讨论】:
即使隔离级别设置为 read_committed,消息仍然显示。 你从哪里打电话给transactionFail()
?如果您从CommandLineRunner.run()
方法调用,您将绕过事务拦截器。您需要将该方法移动到不同的 bean,以便可以拦截该方法以启动事务。或者,使用TransactionTemplate
手动启动事务。当消息到达KafkaProducerMessageHandler
时,如果不存在现有事务,则处理程序会分别发送一个已提交的本地事务。
我是从 CommandLineRunner.run() 调用它的。当我把它拿出来并在另一个类中运行 CommandLineRunner.run() 时,它开始作为事务工作。感谢您的帮助!【参考方案2】:
从不同的类运行事务方法。如果“run”方法和“Transactional”注解方法在同一个类中,CommandLineRunner.run() 会绕过事务拦截器,从而阻止事务方法作为事务运行。
【讨论】:
以上是关于Spring Cloud Stream绑定器架构解析与开发的主要内容,如果未能解决你的问题,请参考以下文章
无法解析我的Json对象,该对象在Spring Cloud流中通过绑定器接收
Spring Cloud Stream 消息从/到 JSON 转换配置