Spring Cloud Stream Kafka Streams Binder KafkaException:无法启动流:“侦听器”不能为空

Posted

技术标签:

【中文标题】Spring Cloud Stream Kafka Streams Binder KafkaException:无法启动流:“侦听器”不能为空【英文标题】:Spring Cloud Stream Kafka Streams Binder KafkaException: Could not start stream: 'listener' cannot be null 【发布时间】:2021-06-27 01:49:39 【问题描述】:

我是 Kafka Streams 和 Spring Cloud Stream 的新手,但在将集成相关代码移动到属性文件方面阅读了有关它的好东西,因此开发人员可以主要关注事物的业务逻辑方面。

这里有我的简单应用程序类。

package com.some.events.consumer

import com.some.events.SomeEvent
import org.apache.kafka.streams.kstream.KStream
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication
import org.springframework.context.annotation.Bean
import java.util.function.Consumer

@SpringBootApplication
class ConsumerApplication 
    @Bean
    fun consume(): Consumer<KStream<String, SomeEvent>> 
        return Consumer  input -> input.foreach  key, value -> println("Key: $key, value: $value")  
    


fun main(args: Array<String>) 
    runApplication<ConsumerApplication>(*args)


我的application.yml文件如下。

spring:
  cloud:
    function:
      definition: consume
    stream:
      bindings:
        consume-in-0:
          destination: "some-event"
          group: "some-event"

我在build.gradle.kts 中的依赖定义如下(这里只包含相关的)。

extra["springCloudVersion"] = "2020.0.2"

dependencies 
    implementation("org.jetbrains.kotlin:kotlin-reflect")
    implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")
    implementation("org.springframework.cloud:spring-cloud-stream")
    implementation("org.springframework.cloud:spring-cloud-stream-binder-kafka-streams")
    testImplementation("org.springframework.boot:spring-boot-starter-test")


dependencyManagement 
    imports 
        mavenBom("org.springframework.cloud:spring-cloud-dependencies:$property("springCloudVersion")")
    

当我运行应用程序时,出现以下异常。

org.springframework.context.ApplicationContextException: Failed to start bean 'streamsBuilderFactoryManager'; nested exception is org.springframework.kafka.KafkaException: Could not start stream: ; nested exception is java.lang.IllegalArgumentException: 'listener' cannot be null
    at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:181) ~[spring-context-5.3.5.jar:5.3.5]
    at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:54) ~[spring-context-5.3.5.jar:5.3.5]
    at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:356) ~[spring-context-5.3.5.jar:5.3.5]
    at java.base/java.lang.Iterable.forEach(Iterable.java:75) ~[na:na]
    at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:155) ~[spring-context-5.3.5.jar:5.3.5]
    at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:123) ~[spring-context-5.3.5.jar:5.3.5]
    at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:935) ~[spring-context-5.3.5.jar:5.3.5]
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:586) ~[spring-context-5.3.5.jar:5.3.5]
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:769) ~[spring-boot-2.4.4.jar:2.4.4]
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:761) ~[spring-boot-2.4.4.jar:2.4.4]
    at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:426) ~[spring-boot-2.4.4.jar:2.4.4]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:326) ~[spring-boot-2.4.4.jar:2.4.4]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1313) ~[spring-boot-2.4.4.jar:2.4.4]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1302) ~[spring-boot-2.4.4.jar:2.4.4]
    at com.some.events.consumer.ConsumerApplicationKt.main(ConsumerApplication.kt:22) ~[main/:na]
Caused by: org.springframework.kafka.KafkaException: Could not start stream: ; nested exception is java.lang.IllegalArgumentException: 'listener' cannot be null
    at org.springframework.cloud.stream.binder.kafka.streams.StreamsBuilderFactoryManager.start(StreamsBuilderFactoryManager.java:94) ~[spring-cloud-stream-binder-kafka-streams-3.1.2.jar:3.1.2]
    at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:178) ~[spring-context-5.3.5.jar:5.3.5]
    ... 14 common frames omitted
Caused by: java.lang.IllegalArgumentException: 'listener' cannot be null
    at org.springframework.util.Assert.notNull(Assert.java:201) ~[spring-core-5.3.5.jar:5.3.5]
    at org.springframework.kafka.config.StreamsBuilderFactoryBean.addListener(StreamsBuilderFactoryBean.java:268) ~[spring-kafka-2.6.7.jar:2.6.7]
    at org.springframework.cloud.stream.binder.kafka.streams.StreamsBuilderFactoryManager.start(StreamsBuilderFactoryManager.java:84) ~[spring-cloud-stream-binder-kafka-streams-3.1.2.jar:3.1.2]
    ... 15 common frames omitted


Process finished with exit code 1

请注意,我知道我需要配置 Serde 和 Avro 相关的东西(我将 Avro 用于事件模式),但问题是,流甚至不会运行。

有人能指出我正确的方向吗?我尝试用谷歌搜索,但没有人发布由“listener”引起的问题不能为空。谢谢!

【问题讨论】:

【参考方案1】:

destination: "some-event" 应该指向一个 kafka 主题。喜欢destination: "some-event-topic"

然后你必须为监听器consume-in-0创建一个接口。使用 spring 注释将使项目加载此侦听器,并且不再为 null。

import org.apache.kafka.streams.kstream.KStream;
import org.springframework.cloud.stream.annotation.Input;

public interface KafkaListenerBinding 
    @Input("consume-in-0")
    KStream<String, String> inputStream();

然后你创建一个@Service 来处理来自监听器@StreamListener("consume-in-0") 的消息。

import lombok.extern.log4j.Log4j2;
import org.apache.kafka.streams.kstream.KStream;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.stereotype.Service;

@Log4j2
@Service
@EnableBinding(KafkaListenerBinding.class)
public class KafkaListenerService 

    @StreamListener("consume-in-0")
    public void process(KStream<String, String> input) 
        input.foreach((k,v) -> log.info(String.format("Key: %s, Value: %s",k,v)));
    

注意:尽管@Gary Russel 说有错误,但我将用实现 Spring 服务的功能方式来完成我的答案。 函数式风格可以通过在application.yml文件中定义函数来实现。有一个内部约定使用函数的名称和 posfix in-0out-0 进行绑定。定义绑定时必须使用它。更多详情here.

spring:
  cloud:
    stream:
      function:
        definition: transformToUpperCase
      bindings:
        transformToUpperCase-in-0:
          destination: input-func-topic
        transformToUpperCase-out-0:
          destination: output-func-topic

然后您使用@Configuration@EnableAutoConfiguration 注释您的类,并确保lambda 方法与您在application.yml 文件中为function.definition 定义的方法相同。

@Configuration
@EnableAutoConfiguration
public class KafkaListenerFunctionalService 

    @Bean
    public Function<KStream<String, String>, KStream<String, String>> transformToUpperCase() 
        return input -> input
                .peek((k, v) -> log.info("Functional received Input: ", v))
                .mapValues(i -> i.toUpperCase());
    

【讨论】:

我想使用函数式编程风格,即FunctionConsumer等,而不是命令式风格,例如EnableBindingStreamListener 等。有什么想法吗? 我也在学习函数式风格。现在我只擅长命令式风格。当我为此开发一个 hello-world 示例时,我完成了这个答案...... @billydh 尽管存在 Gary 所说的错误,但我还是添加了功能示例。希望对您有所帮助... 很好地解决了这个错误。让我试试这个解决方案,如果可行,我会查看您的答案。 其实我在这里很困惑。您将KafkaListenerService 中的名称命名为consume-in-0,但在您的application.yml 中却没有列出。然后你定义了一个不同的函数,它是transformToUpperCase,这是application.yml 上的函数。可以请教一下吗?【参考方案2】:

这是一个错误;它已在 3.1.3-SNAPSHOT 中修复

https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/commit/f25dbff2b7fc0d0c742dd674a9e392057a34c86d

https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1030#issuecomment-804039087

我不确定那里的评论;将千分尺添加到类路径应该可以解决它。

【讨论】:

谢谢加里。我认为那里的评论说Anyway, the test still fails after adding Micrometer, so I'm attaching the simple project, just unpack + run mvn test kstream-component-beans-test.zip所以它不能通过添加千分尺来解决它,对吗?我可能需要等待下一个版本。 否;我的意思是看代码,只需添加千分尺就足够了。我问@sobychacko 这个问题和他的回答:&gt;Oh, I think that comment was talking about the fact that the basic issue of the test failure remained (at the time) which we fixed in that commit along with the null check. He was stating that the null error went away after adding micrometer, but the test still failed due to other issues. 哦,哎呀。知道了。那么将这个人 io.micrometer:micrometer-core 添加到我的依赖项中应该可以正常工作吗? 我不在办公桌前。您可能还需要添加一个 SimpleMeterRegistry @Bean。【参考方案3】:

我有同样的问题,所以首先我添加 io.micrometer 依赖项(从maven安装最新版本)

第二次从 SimpleMeterRegistry 创建 Bean 就解决了问题

@Bean
SimpleMeterRegistry simpleMeterRegistry() 
    return new SimpleMeterRegistry();

【讨论】:

以上是关于Spring Cloud Stream Kafka Streams Binder KafkaException:无法启动流:“侦听器”不能为空的主要内容,如果未能解决你的问题,请参考以下文章

Spring cloud kafka stream pitfalls

Spring cloud kafka stream pitfalls

spring-cloud-stream kafka 消费者并发

spring-cloud-stream-kafka 在应用程序启动后仅使用最新消息

多个 @EnableBinding 与 Kafka Spring Cloud Stream

spring.cloud.stream.kafka.bindings.<channelName>.producer.configuration 未应用