具有 2 个功能的 Apache Kafka Streams 应用程序不起作用

Posted

技术标签:

【中文标题】具有 2 个功能的 Apache Kafka Streams 应用程序不起作用【英文标题】:Apache Kafka Streams Application with 2 functions not working 【发布时间】:2021-07-10 13:57:49 【问题描述】:

我有 2 个应用程序和 3 个 Kafka 主题。第一个应用程序将消息生成到pos-topic。此应用程序正在运行,并且消息已写入主题。在另一个应用程序(不起作用)中,我有 2 个函数正在监听该主题:

hadoop:

@Bean
fun hadoop(): Function<KStream<String, PosInvoice>, KStream<String, HadoopRecord>> 
    return Function  input: KStream<String, PosInvoice> ->
        input.mapValues  v -> recordBuilder.getMaskedInvoice(v) 
            .flatMapValues  v -> recordBuilder.getHadoopRecords(v) 
    

通知:

@Bean
fun notification(): Function<KStream<String, PosInvoice>, KStream<String, Notification>> 
    return Function  input -> input.filter  _, v -> v.customerType.equals("PRIME", ignoreCase = true) 
        .mapValues  v -> recordBuilder.getNotification(v) 

hadoop 函数应该生成到hadoop-sink-topic,通知函数应该生成到notification-topic。这两个函数都在 @Configuration 类中。

主类如下所示:

@SpringBootApplication
class JsonposfanoutApplication
fun main(args: Array<String>) 
   runApplication<JsonposfanoutApplication>(*args)

这是application.yml

spring:
  cloud:
    stream:
      bindings:
        notification-in-0.destination: pos-topic
        notification-out-0.destination: loyalty-topic
        hadoop-in-0.destination: pos-topic
        hadoop-out-0.destination: hadoop-sink-topic
      function:
        definition: notification;hadoop
      kafka:
        streams:
          binder:
            brokers: localhost:9092
            configuration:
              schema.registry.url: http://localhost:8081
          bindings:
            notification-out-0.producer.valueSerde: io.confluent.kafka.streams.serdes.json.KafkaJsonSchemaSerde
            hadoop-out-0.producer.valueSerde: io.confluent.kafka.streams.serdes.json.KafkaJsonSchemaSerde

反序列化和序列化的类对 Json 友好。第一个应用程序将 Json 序列化值写入主题。所以我不需要用 Spring Cloud Streams 明确定义我的consumer.value.serdes 和我的key.serdes 对吧?

出了什么问题?

我不知道。我没有收到错误或异常。应用程序刚刚启动并在几秒钟后完成。我没有关于我的处理器的信息,所以我认为它们甚至都没有启动。 我从 Spring Boot 得到的所有可疑信息都是一个信息: @Bean method FunctionConfiguration.po is non-static and returns an object assignable to Spring's BeanFactoryPostProcessor interface. This will result in a failure to process annotations such as @Autowired, @Resource and @PostConstruct within the method's declaring @Configuration class. Add the 'static' modifier to this method to avoid these container lifecycle issues; see @Bean javadoc for complete details.

我尝试了什么?

我在我的 application.yml 中尝试了很多,但没有任何帮助,我将所有内容恢复到第一个版本。也许我在其他地方遗漏了什么?

【问题讨论】:

试试spring.cloud.function.definition(没有stream)。 不幸的是,这个变化没有改变 【参考方案1】:

我认为对于 Kotlin 应用程序,您需要单独定义 Configuration 类。下面的设置对我有用,我看到这两个功能都被激活了。

@SpringBootApplication
class DemoKotlinApplication

fun main(args: Array<String>) 
    runApplication<DemoKotlinApplication>(*args)

@Configuration
class DemoKotlinConfiguration 

    @Bean
    fun foo(): java.util.function.Function<KStream<String, String>, KStream<String, String>> 
        return Function  
           input: KStream<String, String> -> input
        
    

    @Bean
    fun bar(): java.util.function.Function<KStream<String, String>, KStream<String, String>> 
        return Function  
           input: KStream<String, String> -> input
        
    

application.properties

spring.cloud.stream.function.definition=foo;bar

【讨论】:

我实际上为每个功能有两个配置类。将它们合二为一也无济于事。 好的。您能否分享一个可重现的独立小示例应用程序,我们可以在哪里看到问题? 感谢您让我编写示例应用程序!在示例应用程序中,我有以下异常:org.springframework.context.ApplicationContextException: Failed to start bean 'streamsBuilderFactoryManager'; nested exception is org.springframework.kafka.KafkaException: Could not start stream: ; nested exception is java.lang.IllegalArgumentException: 'listener' cannot be null 所以我使用org.springframework.cloud:spring-cloud-stream-binder-kafka-streams:3.1.3-SNAPSHOT 而不是版本 3.1.2 并且一切正常。同样的更改也解决了其他应用程序中的问题。【参考方案2】:

org.springframework.cloud:spring-cloud-stream-binder-kafka-streams(版本 3.1.2)更​​改为 org.springframework.cloud:spring-cloud-stream-binder-kafka-streams:3.1.3-SNAPSHOT 解决了我的问题。出于某种原因,我没有得到这个例外: org.springframework.context.ApplicationContextException: Failed to start bean 'streamsBuilderFactoryManager'; nested exception is org.springframework.kafka.KafkaException: Could not start stream: ; nested exception is java.lang.IllegalArgumentException: 'listener' cannot be null 在我的应用程序中,但在另一个应用程序中。 这是当时有帮助的答案:https://***.com/a/66888771/15359624

【讨论】:

以上是关于具有 2 个功能的 Apache Kafka Streams 应用程序不起作用的主要内容,如果未能解决你的问题,请参考以下文章

3个离不开Apache Kafka的行业

Apache Kafka教程:主题复制

深入理解Apache Kafka

Stream From 整合 Kafka

kafka-sparkstreaming---学习1

具有多个分区的 Apache Kafka 消息顺序