多个 @EnableBinding 与 Kafka Spring Cloud Stream

Posted

技术标签:

【中文标题】多个 @EnableBinding 与 Kafka Spring Cloud Stream【英文标题】:Multiple @EnableBinding with Kafka Spring Cloud Stream 【发布时间】:2019-06-28 14:14:50 【问题描述】:

我正在尝试设置一个监听 Kafka 的 Spring Boot 应用程序。

我正在使用 Kafka Streams Binder。

一个简单的@EnableBinding

@EnableBinding(StreamExample.StreamProcessor.class)
public class StreamExample 

    @StreamListener(StreamProcessor.INPUT)
    @SendTo(StreamProcessor.OUTPUT)
    public KStream<String, String> process(KStream<String, String> input) 

        logger.info("Stream listening");

        return input
                .peek(((key, value) -> logger.info("key =  value = ", key, value)));
    

    interface StreamProcessor 

        String INPUT = "input_1";
        String OUTPUT = "output_1";

        @Input(INPUT)
        KStream<String, String> input();

        @Output(OUTPUT)
        KStream<String, String> outputProcessed();
    

application.yml

spring:
  cloud:
    stream:
      kafka:
        streams:
          binder:
            brokers: localhost:29092
      bindings:
         input_1:
           destination: mytopic1
           group: readgroup
         output_1:
           destination: mytopic2
         input_2:
           destination: mytopic3
           group: readgroup
         output_2:
           destination: mytopic4
  application:
    name: stream_s1000_app

一切正常。

但是如果我尝试使用其他绑定添加第二个类,则会出现以下错误:

以下订阅主题未分配给任何成员:[mytopic1]

第二个绑定示例:

@EnableBinding(StreamExampleBindingTwo.StreamProcessor.class)
public class StreamExampleBindingTwo 

    @StreamListener(StreamProcessor.INPUT)
    @SendTo(StreamProcessor.OUTPUT)
    public KStream<String, String> process(KStream<String, String> input) 

        logger.info("Stream listening binding two");

        return input
                .peek(((key, value) -> logger.info("key =  value = ", key, value)));
    

    interface StreamProcessor 

        String INPUT = "input_2";
        String OUTPUT = "output_2";

        @Input(INPUT)
        KStream<String, String> input();

        @Output(OUTPUT)
        KStream<String, String> outputProcessed();
    

我错过了什么?我不能在同一个应用程序中使用多个输入主题和多个输出吗?有什么和application.name有关的吗?

【问题讨论】:

【参考方案1】:

试试

@EnableBinding(  StreamExample.StreamProcessor.class, StreamExampleBindingTwo.StreamProcessor.class )

【讨论】:

错误依旧,我只收到 Setting new assignments [my-topic3] 哦,我没注意到你在使用KStream - 我不是很熟悉。【参考方案2】:

我刚刚尝试了一个应用程序,并且成功了。当您在同一个应用程序中有多个处理器时,您需要确保每个处理器都有自己的应用程序 ID。 请参阅下面我如何为 application.yml 中的两个输入提供 2 个不同的应用程序 ID。

我看到两个处理器都在控制台上登录。此外,还看到了关于输出主题的消息。

@SpringBootApplication
@EnableBinding(So54522918Application.StreamProcessor1.class, So54522918Application.StreamProcessor2.class)
public class So54522918Application 

    public static void main(String[] args) 
        SpringApplication.run(So54522918Application.class, args);
    

    @StreamListener(StreamProcessor1.INPUT)
    @SendTo(StreamProcessor1.OUTPUT)
    public KStream<String, String> process1(KStream<String, String> input) 

        System.out.println("Stream listening");

        return input
                .peek(((key, value) -> System.out.println("key = " + key +", value = " + value)));
    

    @StreamListener(StreamProcessor2.INPUT)
    @SendTo(StreamProcessor2.OUTPUT)
    public KStream<String, String> process2(KStream<String, String> input) 

        System.out.println("Stream listening binding two");

        return input
                .peek(((key, value) -> System.out.println("key = " + key +", value = " + value)));
    

    interface StreamProcessor1 

        String INPUT = "input_1";
        String OUTPUT = "output_1";

        @Input(INPUT)
        KStream<String, String> input();

        @Output(OUTPUT)
        KStream<String, String> outputProcessed();
    

    interface StreamProcessor2 

        String INPUT = "input_2";
        String OUTPUT = "output_2";

        @Input(INPUT)
        KStream<String, String> input();

        @Output(OUTPUT)
        KStream<String, String> outputProcessed();
    


application.yml 的相关部分

spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms: 1000
spring.cloud.stream.kafka.streams:
  binder.configuration:
    default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
    default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
  bindings.input_1.consumer.application-id: process-1
  bindings.input_2.consumer.application-id: process-2
spring.cloud.stream.bindings.input_1:
  destination: mytopic1
spring.cloud.stream.bindings.output_1:
  destination: mytopic2
spring.cloud.stream.bindings.input_2:
  destination: mytopic3
spring.cloud.stream.bindings.output_2:
  destination: mytopic4

【讨论】:

谢谢,但是文档中哪里说我们需要 2 个应用程序 ID? 请参阅本节中的applicationId:cloud.spring.io/spring-cloud-static/… 你说得对,谢谢。我认为文档应该比这更清晰...阅读 Spring Cloud Stream Docs (docs.spring.io/spring-cloud-stream/docs/current/reference/…) 没有提到...所以再次非常感谢您!

以上是关于多个 @EnableBinding 与 Kafka Spring Cloud Stream的主要内容,如果未能解决你的问题,请参考以下文章

Kafka 连接与多个消息队列的集成

Kafka架构原理

Spring Cloud Stream教程编程模型

阿里大牛实战归纳——Kafka架构原理

阿里大牛实战总结归纳—Kafka架构原理

在Kafka Connect中,如何连接多个kafka集群?