如何在spring cloud stream和kafka中从同一主题发送和接收

Posted

技术标签:

【中文标题】如何在spring cloud stream和kafka中从同一主题发送和接收【英文标题】:how to send and receive from the same topic within spring cloud stream and kafka 【发布时间】:2016-11-24 10:49:07 【问题描述】:

我有一个带有 kafka 绑定的 spring-cloud-stream 应用程序。我想从同一个可执行文件(jar)中发送和接收来自同一个主题的消息。我的频道定义如下:- public interface ChannelDefinition @Input("forum") public SubscriableChannel readMessage(); @Output("forum") public MessageChannel postMessage();

我使用@StreamListener 接收消息。我收到各种意外错误。有时,我会收到

    未找到所有其他消息的 unknown.message.channel 的调度程序 如果我将命令行 kafka 订阅者附加到上述论坛主题,它会收到所有其他消息。 我的应用程序接收所有其他消息,这是来自命令行订阅者的专有消息集。我已确保我的应用程序以特定的组名订阅。

是否有上述用例的工作示例?

【问题讨论】:

【参考方案1】:

对我来说,从“输入”消费是行不通的。我需要在@Streamlistener 上使用方法名,并且需要使用@EnableBinding,如下所示:

@Slf4j
@RequiredArgsConstructor
@EnableBinding(value = Channels.class)
public class Consumer 
    
    @StreamListener("readMessage")
    public void retrieve(Something req) 
        log.info("Received ", req);
    


【讨论】:

【参考方案2】:

除了上述 Marius Bogoevici 的回答之外,这里还有一个如何收听该输入的示例。

@StreamListener
public void handleNewOrder(@Input("input") SubscribableChannel input) 
    logger.info("Subscribing...");
    input.subscribe((message) -> 
        logger.info("Received new message: ", message);
    );

【讨论】:

【参考方案3】:

这是定义可绑定通道的错误方法(因为两者都使用了forum 名称)。我们应该更加彻底并且快速失败,但是您将输入和输出绑定到同一个通道并在您的应用程序中创建一个竞争消费者。这也解释了您与备用消息有关的其他问题。

你应该做的是:

public interface ChannelDefinition  

   @Input
   public MessageChannel readMessage();

   @Output
   public MessageChannel postMessage();

然后使用应用程序属性将您的频道绑定到同一个队列:

spring.cloud.stream.bindings.readMessage.destination=forum
spring.cloud.stream.bindings.postMessage.destination=forum

【讨论】:

我在下面添加了一个答案,其中包含有关如何根据此答案订阅输入的代码:***.com/questions/43128803/…。

以上是关于如何在spring cloud stream和kafka中从同一主题发送和接收的主要内容,如果未能解决你的问题,请参考以下文章

`spring-cloud-starter-eureka-server`和`spring-cloud-starter-netflix-eureka-server`之间的区别

spring-cloud-starter-eureka-server 和 spring-cloud-starter-netflix-eureka-server的区别

随手记录关于spring-cloud-starter-eureka-server 和 spring-cloud-starter-netflix-eureka-server

如何在spring cloud stream和kafka中从同一主题发送和接收

如何构建 Spring Cloud Stream JMS ActiveMQ

Spring Cloud Stream如何消费自己生产的消息