多个 @EnableBinding 与 Kafka Spring Cloud Stream
Posted
技术标签:
【中文标题】多个 @EnableBinding 与 Kafka Spring Cloud Stream【英文标题】:Multiple @EnableBinding with Kafka Spring Cloud Stream 【发布时间】:2019-06-28 14:14:50 【问题描述】:我正在尝试设置一个监听 Kafka 的 Spring Boot 应用程序。
我正在使用 Kafka Streams Binder。
一个简单的@EnableBinding
@EnableBinding(StreamExample.StreamProcessor.class)
public class StreamExample
@StreamListener(StreamProcessor.INPUT)
@SendTo(StreamProcessor.OUTPUT)
public KStream<String, String> process(KStream<String, String> input)
logger.info("Stream listening");
return input
.peek(((key, value) -> logger.info("key = value = ", key, value)));
interface StreamProcessor
String INPUT = "input_1";
String OUTPUT = "output_1";
@Input(INPUT)
KStream<String, String> input();
@Output(OUTPUT)
KStream<String, String> outputProcessed();
在application.yml
spring:
cloud:
stream:
kafka:
streams:
binder:
brokers: localhost:29092
bindings:
input_1:
destination: mytopic1
group: readgroup
output_1:
destination: mytopic2
input_2:
destination: mytopic3
group: readgroup
output_2:
destination: mytopic4
application:
name: stream_s1000_app
一切正常。
但是如果我尝试使用其他绑定添加第二个类,则会出现以下错误:
以下订阅主题未分配给任何成员:[mytopic1]
第二个绑定示例:
@EnableBinding(StreamExampleBindingTwo.StreamProcessor.class)
public class StreamExampleBindingTwo
@StreamListener(StreamProcessor.INPUT)
@SendTo(StreamProcessor.OUTPUT)
public KStream<String, String> process(KStream<String, String> input)
logger.info("Stream listening binding two");
return input
.peek(((key, value) -> logger.info("key = value = ", key, value)));
interface StreamProcessor
String INPUT = "input_2";
String OUTPUT = "output_2";
@Input(INPUT)
KStream<String, String> input();
@Output(OUTPUT)
KStream<String, String> outputProcessed();
我错过了什么?我不能在同一个应用程序中使用多个输入主题和多个输出吗?有什么和application.name有关的吗?
【问题讨论】:
【参考方案1】:试试
@EnableBinding( StreamExample.StreamProcessor.class, StreamExampleBindingTwo.StreamProcessor.class )
【讨论】:
错误依旧,我只收到 Setting new assignments [my-topic3] 哦,我没注意到你在使用KStream
- 我不是很熟悉。【参考方案2】:
我刚刚尝试了一个应用程序,并且成功了。当您在同一个应用程序中有多个处理器时,您需要确保每个处理器都有自己的应用程序 ID。
请参阅下面我如何为 application.yml
中的两个输入提供 2 个不同的应用程序 ID。
我看到两个处理器都在控制台上登录。此外,还看到了关于输出主题的消息。
@SpringBootApplication
@EnableBinding(So54522918Application.StreamProcessor1.class, So54522918Application.StreamProcessor2.class)
public class So54522918Application
public static void main(String[] args)
SpringApplication.run(So54522918Application.class, args);
@StreamListener(StreamProcessor1.INPUT)
@SendTo(StreamProcessor1.OUTPUT)
public KStream<String, String> process1(KStream<String, String> input)
System.out.println("Stream listening");
return input
.peek(((key, value) -> System.out.println("key = " + key +", value = " + value)));
@StreamListener(StreamProcessor2.INPUT)
@SendTo(StreamProcessor2.OUTPUT)
public KStream<String, String> process2(KStream<String, String> input)
System.out.println("Stream listening binding two");
return input
.peek(((key, value) -> System.out.println("key = " + key +", value = " + value)));
interface StreamProcessor1
String INPUT = "input_1";
String OUTPUT = "output_1";
@Input(INPUT)
KStream<String, String> input();
@Output(OUTPUT)
KStream<String, String> outputProcessed();
interface StreamProcessor2
String INPUT = "input_2";
String OUTPUT = "output_2";
@Input(INPUT)
KStream<String, String> input();
@Output(OUTPUT)
KStream<String, String> outputProcessed();
application.yml 的相关部分
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms: 1000
spring.cloud.stream.kafka.streams:
binder.configuration:
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
bindings.input_1.consumer.application-id: process-1
bindings.input_2.consumer.application-id: process-2
spring.cloud.stream.bindings.input_1:
destination: mytopic1
spring.cloud.stream.bindings.output_1:
destination: mytopic2
spring.cloud.stream.bindings.input_2:
destination: mytopic3
spring.cloud.stream.bindings.output_2:
destination: mytopic4
【讨论】:
谢谢,但是文档中哪里说我们需要 2 个应用程序 ID? 请参阅本节中的applicationId
:cloud.spring.io/spring-cloud-static/…
你说得对,谢谢。我认为文档应该比这更清晰...阅读 Spring Cloud Stream Docs (docs.spring.io/spring-cloud-stream/docs/current/reference/…) 没有提到...所以再次非常感谢您!以上是关于多个 @EnableBinding 与 Kafka Spring Cloud Stream的主要内容,如果未能解决你的问题,请参考以下文章