具有 2 个功能的 Apache Kafka Streams 应用程序不起作用
Posted
技术标签:
【中文标题】具有 2 个功能的 Apache Kafka Streams 应用程序不起作用【英文标题】:Apache Kafka Streams Application with 2 functions not working 【发布时间】:2021-07-10 13:57:49 【问题描述】:我有 2 个应用程序和 3 个 Kafka 主题。第一个应用程序将消息生成到pos-topic
。此应用程序正在运行,并且消息已写入主题。在另一个应用程序(不起作用)中,我有 2 个函数正在监听该主题:
hadoop:
@Bean
fun hadoop(): Function<KStream<String, PosInvoice>, KStream<String, HadoopRecord>>
return Function input: KStream<String, PosInvoice> ->
input.mapValues v -> recordBuilder.getMaskedInvoice(v)
.flatMapValues v -> recordBuilder.getHadoopRecords(v)
通知:
@Bean
fun notification(): Function<KStream<String, PosInvoice>, KStream<String, Notification>>
return Function input -> input.filter _, v -> v.customerType.equals("PRIME", ignoreCase = true)
.mapValues v -> recordBuilder.getNotification(v)
hadoop 函数应该生成到hadoop-sink-topic
,通知函数应该生成到notification-topic
。这两个函数都在 @Configuration
类中。
主类如下所示:
@SpringBootApplication
class JsonposfanoutApplication
fun main(args: Array<String>)
runApplication<JsonposfanoutApplication>(*args)
这是application.yml:
spring:
cloud:
stream:
bindings:
notification-in-0.destination: pos-topic
notification-out-0.destination: loyalty-topic
hadoop-in-0.destination: pos-topic
hadoop-out-0.destination: hadoop-sink-topic
function:
definition: notification;hadoop
kafka:
streams:
binder:
brokers: localhost:9092
configuration:
schema.registry.url: http://localhost:8081
bindings:
notification-out-0.producer.valueSerde: io.confluent.kafka.streams.serdes.json.KafkaJsonSchemaSerde
hadoop-out-0.producer.valueSerde: io.confluent.kafka.streams.serdes.json.KafkaJsonSchemaSerde
反序列化和序列化的类对 Json 友好。第一个应用程序将 Json 序列化值写入主题。所以我不需要用 Spring Cloud Streams 明确定义我的consumer.value.serdes
和我的key.serdes
对吧?
出了什么问题?
我不知道。我没有收到错误或异常。应用程序刚刚启动并在几秒钟后完成。我没有关于我的处理器的信息,所以我认为它们甚至都没有启动。
我从 Spring Boot 得到的所有可疑信息都是一个信息:
@Bean method FunctionConfiguration.po is non-static and returns an object assignable to Spring's BeanFactoryPostProcessor interface. This will result in a failure to process annotations such as @Autowired, @Resource and @PostConstruct within the method's declaring @Configuration class. Add the 'static' modifier to this method to avoid these container lifecycle issues; see @Bean javadoc for complete details.
我尝试了什么?
我在我的 application.yml 中尝试了很多,但没有任何帮助,我将所有内容恢复到第一个版本。也许我在其他地方遗漏了什么?
【问题讨论】:
试试spring.cloud.function.definition
(没有stream
)。
不幸的是,这个变化没有改变
【参考方案1】:
我认为对于 Kotlin 应用程序,您需要单独定义 Configuration
类。下面的设置对我有用,我看到这两个功能都被激活了。
@SpringBootApplication
class DemoKotlinApplication
fun main(args: Array<String>)
runApplication<DemoKotlinApplication>(*args)
和
@Configuration
class DemoKotlinConfiguration
@Bean
fun foo(): java.util.function.Function<KStream<String, String>, KStream<String, String>>
return Function
input: KStream<String, String> -> input
@Bean
fun bar(): java.util.function.Function<KStream<String, String>, KStream<String, String>>
return Function
input: KStream<String, String> -> input
application.properties
spring.cloud.stream.function.definition=foo;bar
【讨论】:
我实际上为每个功能有两个配置类。将它们合二为一也无济于事。 好的。您能否分享一个可重现的独立小示例应用程序,我们可以在哪里看到问题? 感谢您让我编写示例应用程序!在示例应用程序中,我有以下异常:org.springframework.context.ApplicationContextException: Failed to start bean 'streamsBuilderFactoryManager'; nested exception is org.springframework.kafka.KafkaException: Could not start stream: ; nested exception is java.lang.IllegalArgumentException: 'listener' cannot be null
所以我使用org.springframework.cloud:spring-cloud-stream-binder-kafka-streams:3.1.3-SNAPSHOT
而不是版本 3.1.2 并且一切正常。同样的更改也解决了其他应用程序中的问题。【参考方案2】:
将 org.springframework.cloud:spring-cloud-stream-binder-kafka-streams
(版本 3.1.2)更改为 org.springframework.cloud:spring-cloud-stream-binder-kafka-streams:3.1.3-SNAPSHOT
解决了我的问题。出于某种原因,我没有得到这个例外:
org.springframework.context.ApplicationContextException: Failed to start bean 'streamsBuilderFactoryManager'; nested exception is org.springframework.kafka.KafkaException: Could not start stream: ; nested exception is java.lang.IllegalArgumentException: 'listener' cannot be null
在我的应用程序中,但在另一个应用程序中。
这是当时有帮助的答案:https://***.com/a/66888771/15359624
【讨论】:
以上是关于具有 2 个功能的 Apache Kafka Streams 应用程序不起作用的主要内容,如果未能解决你的问题,请参考以下文章