当没有运行 kafka 代理时,如何禁用 Spring Cloud Stream 以用于开发目的?

Posted

技术标签:

【中文标题】当没有运行 kafka 代理时,如何禁用 Spring Cloud Stream 以用于开发目的?【英文标题】:How can I disable spring cloud stream for development purpose when there are not kafka broker running? 【发布时间】:2020-05-14 01:14:03 【问题描述】:

我有多个使用 kafka 代理实现 Spring Cloud Stream 的 Spring Boot 应用程序。我想知道是否可以停止或禁用 spring 云流或 kafka 代理连接以启用应用程序。

【问题讨论】:

【参考方案1】:

您可以通过在 spring boot 应用程序中禁用 kafka 绑定来做到这一点

    应用类

    import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
    
    @SpringBootApplication(exclude = KafkaAutoConfiguration.class)
    
    public class Application 
      ...
    
    

    application.yml(如果使用 yml)

    spring: 
      autoconfigure:
        exclude: org.org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration
    

    application.properties(如果使用属性)

    spring.autoconfigure.exclude=org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration
    

【讨论】:

您好,感谢您的回答。我试过但它失败了这条消息: 描述:org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsBinderSupportAutoConfiguration 中的方法 binderConfigurationProperties 的参数 0 需要一个 'org.springframework.boot.autoconfigure.kafka.KafkaProperties' 类型的 bean找不到。 这不适用于 RabbitMQ,因为在使用 org.springframework.cloud:spring-cloud-stream-binder-rabbit 时无法禁用 AmpqAutoConfiguration :(【参考方案2】:

即使代理不可用,应用程序也应该启动。

您可以在类路径中添加一个 noop Binder 并将其设置为默认绑定器或为您的绑定指定它。 这里有一些 Kotlin 代码:

NoOpBinder 实现类:

package com.demo

import org.slf4j.LoggerFactory
import org.springframework.cloud.stream.binder.Binder
import org.springframework.cloud.stream.binder.Binding
import org.springframework.cloud.stream.binder.ConsumerProperties
import org.springframework.cloud.stream.binder.ProducerProperties
import org.springframework.messaging.MessageChannel

class NoOpBinder : Binder<MessageChannel, ConsumerProperties, ProducerProperties> 
    val logger = LoggerFactory.getLogger(javaClass)!!
    override fun bindConsumer(
        name: String,
        group: String,
        inboundBindTarget: MessageChannel,
        consumerProperties: ConsumerProperties
    ): Binding<MessageChannel> = NoOpBinding(name).also  logger.info("bindConsumer: $it") 

    override fun bindProducer(
        name: String,
        outboundBindTarget: MessageChannel,
        producerProperties: ProducerProperties
    ): Binding<MessageChannel> = NoOpBinding(name).also  logger.info("bindProducer: $it") 

    private class NoOpBinding(val binderName: String) : Binding<MessageChannel> 
        val logger = LoggerFactory.getLogger(javaClass)!!

        override fun getName() = binderName
        override fun unbind() 
            logger.info("unbind: $this")
        

        override fun toString() = "NoOpBinding [$name]"
    

一个配置类:

package com.demo

import org.springframework.context.annotation.Bean

// Warn: this class is referenced in META-INF/spring.binders and used by spring cloud stream to instantiate binders.
class NoOpBinderServiceConfigurer 

    @Bean
    fun noOpBinder() = NoOpBinder()


//resources/META-INF/spring.binders

noop: com.demo.NoOpBinderServiceConfigurer

在配置文件 application.yml 中指定绑定器

spring:
  cloud:
    stream:
      bindings:
        my-binding:
          destination: my-destination
          group: my-group
          binder: noop

或者在你的配置文件 application.yml 中指定默认的 binder

spring:
  cloud:
    stream:
      bindings:
        defaultBinder: noop

--

参考:

https://cloud.spring.io/spring-cloud-stream/spring-cloud-stream.html#multiple-binders

【讨论】:

将默认 binder 更改为 noop 后,我还需要删除 binders 依赖项。 你能解释一下 NoOp binder 的 Kotlin 代码在做什么吗? 它只是实现了一个什么都不做的活页夹,所以你可以使用 kafka 或 rabbit binder

以上是关于当没有运行 kafka 代理时,如何禁用 Spring Cloud Stream 以用于开发目的?的主要内容,如果未能解决你的问题,请参考以下文章

如何列出集群中所有可用的 Kafka 代理?

当我重新启动我的 kafka 代理时,为啥或如何丢失一些消息?

Kafka 如何在代理之间分配主题分区

如何在没有kafka服务器的情况下运行spring boot

kafka消费者AbstractCoordinator:发现协调器Java客户端

如何在chocolatey上禁用显式代理?