Spring Cloud Stream绑定器架构解析与开发

Posted 炒栗子不加糖

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spring Cloud Stream绑定器架构解析与开发相关的知识,希望对你有一定的参考价值。

Spring Cloud Stream绑定器架构解析与开发


根据不同的使用场景我们通常会选择相适应的消息中间件,例如对于日志收集场景可能会选择使用Kafka,对于订单场景通常会选择RocketMQ,不同消息中间件的客户端是不同的,我们需要针对不同消息中间件写不同的代码。对于TOC的产品,我们可以根据业务场景选择适当的消息中间件,但对于TOB的产品,不同客户对于消息中间件需求是不同的,我们需要方便的切换消息中间件,并且不需要重构代码

Spring Cloud Stream能很好的帮我们解决上面的问题。通过Binder解藕具体消息中间件,屏蔽不同消息中间件使用的差异,基于Binder Spring Cloud Stream提供统一的配置及使用方式,用于发送消息、消费消息。对于使用方而言,只需要了解如何使用Spring Cloud Stream发送、接收消息无需关心使用那种消息中间件。Binder则需要负责不通消息中间件如何去发送、接收消息及其它的高级特性,例如事务等。因此我们可以看到针对不通消息中间件的Binder,例如spring-cloud-stream-binder-kafka、spring-cloud-stream-binder-rabbit等,我们自己也可以根据规范开发符合自己需求的Binder,例如对于消息量不大的用户,可以使用基于redis的消息中间件,我们可以开发一个对应的Binder,对接到Spring Cloud Stream。下面我将介绍Binder的架构及如何开发一个Binder

Binder架构介绍及源码解析


从源码角度,我们来看Spring Cloud Stream是如何与Binder交互,Binder具体又干了什么事情

项目启动

项目启动会先去扫描项目下所有依赖的绑定器,并将绑定器信息存储到内存对象中,具体而言做了如下事情:

  • 扫描META-INF/spring.binders,获取绑定器类型及对应配置加载类,用于创建BinderTypeRegistry对象

  • 根据BinderTypeRegistry构建BinderFactory。BinderFactory既包含绑定器的信息,也包含扫描yaml文件下配置的spring cloud steam相关的配置信息

发送消息

项目启动之后,准备工作已经完成,我们下面从发送消息流程来看,Spring Cloud Stream如果与Binder协作,完成消息发送

BinderFactory获取绑定器实例

启动项目时,我们将绑定器的信息都放到了BinderFactory中,因此发送消息的第一步就是要根据绑定器的信息,创建绑定器实例,完成绑定器的初始工作

加载绑定器配置类

Spring Cloud Stream通过DefaultBinderFactory的initializeBinderContextSimple方法,加载绑定器对应的配置类,并创建AnnotationConfigApplicationContext
上下文对象

加载绑定器对应配置类之后将触发绑定器的初始化

  • 初始化Binder配置类、初始化Binding配置类
  • 初始化绑定器目标提供者
  • 初始化通道绑定器
获取绑定器实例

通过上步创建的上下文对象即可获取绑定器实例,将获取的绑定器实例缓存在内存中

使用绑定器将输出通道与具体消息中间件绑定

输出通道是Spring Cloud Stream的概念,我们需要将输出通道关联到具体的发送消息客户端,而绑定器实例有构造发送消息客户端的方法。因此这一步需要创建具体的发送消息客户端,并将其关联到输出通道,具体情况如下:

  • 获取扩展生产者配置信息并与Spring Cloud Stream 生产者属性合并

  • 获取生产者发送目标提供者

  • 创建发送消息实例,并完成发送消息对象初始化

  • 将输出通道与发送消息实例绑定
  • 创建binding并缓存到发送通道,用于之后解除绑定等操作

通过发送通道发送消息

完成发送通道与具体发送客户端绑定之后就可以发送消息,在下面方法中将使用具体消息中间件的客户端完成消息发送

总结

通过上述源码解析,我们可以清楚的看到,Spring Cloud Stream为用户提供了统一的发送消息、消费消息、及配置参数的管理。但在具体执行发送、消费时,则是由绑定器创建的生产者实例、消费者实例完成的。这些由绑定器创建生产者实例、消费者实例是与具体消息中间件关联的,因此不通的消息中间件会对应不同的绑定器。绑定器作为了Spring Cloud Steam与具体消息中间件的桥梁

开发实例

下面将具体说明我们怎样去写一个自己的绑定器

创建spring.binders

创建resources/META-INF/spring.binders,指名绑定器类型及对应的配置类

创建配置类

配置类需要扫描我们扩展的属性类,包括绑定器的配置参数类、Binding的配置参数类,注册目标提供者对象、消息通道绑定器对象

绑定器参数类

绑定器参数类用于存放与具体消息中间件连接相关的信息,例如地址、用户名、密码、端口等信息

Binding扩展属性配置类

这里包括生产者配置属性及消费者配置属性。对于发送、消费时需要的特殊参数可以放到这里,例如使用redis作为消息中间件,我们需要指定发送、消费的
DB是多少,因此需要将此参数放到生产、消费配置类中

  • 创建RedisMQExtendedBindingProperties并继承AbstractExtendedBindingProperties指定消费属性类、发送消息属性类、Binding属性类

  • 创建RedisMQConsumerProperties消费消息配置类

  • 创建RedisMQProducerProperties发送消息配置类

  • 创建RedisMQBindingProperties配置类,需要实现BinderSpecificPropertiesProvider。其中的属性既是消费消息属性类及发送消息属性类

提供者

参数处理完成后,我们需要创建一个提供者。提供者记录了我们需要发送或消费消息的topic,以及分区。提供者需要实现ProvisioningProvider

消息通道绑定器

消息通道绑定器需要提供创建发送消息实例、消费消息实例、获取扩展生产者属性类、扩展消费者属性类及Binding属性类的Class。继承AbstractMessageChannelBinder实现ExtendedPropertiesBinder

具体生产者实现

具体生产者实现类,会完成具体的消息连接初始化工作、具体的发送消息等操作。需要继承AbstractMessageHandler,实现Lifecycle

具体消费者实现

完成具体消息消费、停止消息消费,需要继承MessageProducerSupport

用于 Spring Cloud 的 Kafka 绑定器不使用事务回滚

【中文标题】用于 Spring Cloud 的 Kafka 绑定器不使用事务回滚【英文标题】:Kafka Binders for Spring Cloud not rolling back with Transactional 【发布时间】:2021-09-19 04:00:08 【问题描述】:

我正在尝试通过 Spring Cloud 在事务中向 Kafka 中的两个单独主题发送消息。在第一条和第二条消息之间抛出异常时,第一条消息仍然出现在第一个主题的消费者中,表明消息没有被回滚。这是我的代码:

@Configuration
@EnableTransactionManagement
public class KafkaChannelTester implements CommandLineRunner 

    ChannelHolder channelHolder;
    MessageChannel messageChannel1;
    MessageChannel messageChannel2;

    public KafkaChannelTester(ChannelHolder channelHolder) 
        this.channelHolder = channelHolder;
        this.messageChannel1 = channelHolder.messageChannel1();
        this.messageChannel2 = channelHolder.messageChannel2();
    

    @Override
    public void run(String... args) throws Exception 
        transactionFail();
    


    public void throwException() throw new RuntimeException();

    @Transactional
    public void transactionFail()
        Message<String> message1 = MessageBuilder
            .withPayload("Test-transaction-fail-"+ LocalDateTime.now())
            .build();
        Message<String> message2 = MessageBuilder
            .withPayload("Test-transaction-fail-"+ LocalDateTime.now())
            .build();
        messageChannel1.send(message1);
        throwException();
        messageChannel2.send(message2);
    
    @Bean
    public PlatformTransactionManager transactionManager(BinderFactory binders) 
        ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder(null,
            MessageChannel.class)).getTransactionalProducerFactory();
        System.out.println(pf.transactionCapable());
        System.out.println(pf.getTransactionIdPrefix());
        KafkaTransactionManager tm = new KafkaTransactionManager<>(pf);
        return tm;
    

application.yml 包含以下内容:

spring:
  cloud:
    stream:
      bindings:
        cloud-producer-1:
          destination:
            peter.cloud.test.1
        cloud-producer-2:
          destination:
            peter.cloud.test.2
      kafka:
        binder:
          brokers:
            - testkbroker:9092
          transaction:
            transaction-id-prefix: transaction-1-
            producer:
              configuration:
                enable.idempotence: true
                retries: 1
                acks: all

transactionManager 中的打印语句确认生产者工厂确实具有事务 id 前缀,并且具有事务能力。我该怎么做才能使交易正常进行?

【问题讨论】:

【参考方案1】:

记录总是被写入日志,即使它们被回滚。默认情况下,消费者会看到回滚记录,您必须将消费者属性isolation.level 设置为read_committed 以避免获得回滚记录。

https://kafka.apache.org/documentation/#consumerconfigs_isolation.level

【讨论】:

即使隔离级别设置为 read_committed,消息仍然显示。 你从哪里打电话给transactionFail()?如果您从CommandLineRunner.run() 方法调用,您将绕过事务拦截器。您需要将该方法移动到不同的 bean,以便可以拦截该方法以启动事务。或者,使用TransactionTemplate 手动启动事务。当消息到达KafkaProducerMessageHandler 时,如果不存在现有事务,则处理程序会分别发送一个已提交的本地事务。 我是从 CommandLineRunner.run() 调用它的。当我把它拿出来并在另一个类中运行 CommandLineRunner.run() 时,它开始作为事务工作。感谢您的帮助!【参考方案2】:

从不同的类运行事务方法。如果“run”方法和“Transactional”注解方法在同一个类中,CommandLineRunner.run() 会绕过事务拦截器,从而阻止事务方法作为事务运行。

【讨论】:

以上是关于Spring Cloud Stream绑定器架构解析与开发的主要内容,如果未能解决你的问题,请参考以下文章

无法解析我的Json对象,该对象在Spring Cloud流中通过绑定器接收

Spring Cloud Stream 消息从/到 JSON 转换配置

cloud stream 官方文档阅读笔记4

Spring Cloud Stream GCP如何重新排队失败的消息

Spring Cloud Azure 参考文档

Spring Cloud入门 - 微服务与消息驱动