使用 Spring Cloud Stream 将 RabbitMQ 消费者绑定到现有队列

Posted

技术标签:

【中文标题】使用 Spring Cloud Stream 将 RabbitMQ 消费者绑定到现有队列【英文标题】:Bind RabbitMQ consumer using Spring Cloud Stream to an existing queue 【发布时间】:2017-04-28 13:52:16 【问题描述】:

我使用 RabbitMQ web-UI 创建了一个主题交换 TX 并绑定到交换两个队列 TX.Q1TX.Q2,每个都与路由键 rk1rk2 相应地绑定,并产生很少的消息到交换。

现在我想使用 Spring Cloud Stream 创建一个仅从 Q1 获取消息的消费者。 我尝试使用配置:

spring.cloud.stream.bindings.input.destination=TX
spring.cloud.stream.bindings.input.group=Q1

以及消费消息的方法的注解@StreamListner(Sink.INPUT)

结果我可以看到消费者创建了一个具有相同名称的队列(或绑定)TX.Q1,但新队列/绑定的 Routing-Key 是 #. 如何通过 Spring Cloud Stream 配置将使用来自预定义队列的消息的消费者(仅使用 rk1 路由的消息)。

【问题讨论】:

【参考方案1】:

所以现在,Garry Russell 建议的解决方法已经为我解决了这个问题。

我以这种方式使用@RabbitListener 而不是@StreamListenet@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "TX.Q1", durable = "true"), exchange = @Exchange(value = "TX", type = "topic", durable = "true"), key = "rk1")

因此,预定义队列 TX.Q1 使用绑定密钥 rk1 绑定到交换 TX

等待Spring Cloud Steream issue 的更新。

【讨论】:

这个解决方法已经有一段时间了,我想知道您是否提出了一个更干净的解决方案(例如,使用属性而不是注释)。【参考方案2】:

我想我使用@StreamListener 找到了解决方案,而不是使用解决方法。一切都是在配置中完成的,而不是在代码中。

我使用的配置如下(在 .yml 中,但您可以在 .properties 中轻松翻译):

spring:
  cloud:
    stream:
      bindings:
        input:
          binder: <binder_name>
          destination: TX
          group: Q1
      binders:
        <binder_name>:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: <host>
                port: <port>
                virtual-host: <vhost>
                username: <username>
                password: <password>
      rabbit:
        bindings:
          input:
            consumer:
              binding-routing-key: rk1
              exchange-name: TX
              queue-name-group-only: true
              bind-queue: true
              exchange-durable: true
              exchange-type: topic

使用这种方法,您不必编写特定代码来让 RabbitMQ 使用者连接到您的集群,这应该可以解决您的问题。

希望这会有所帮助。

【讨论】:

它没有按预期工作。例如,我确实提供了上述设置,但每次重新启动应用程序时,我注意到在 RabbitMQ 中使用路由键 # 创建了一个新绑定——即使我已经定义了另一个路由键。因此,每当我发送消息时,即使没有路由密钥 - 我不希望我的听众收到该消息 - 但是,由于也自动创建了新绑定 (#),如原始问题中所述 - 我无法发送消息特定的路由键。【参考方案3】:

Spring Cloud Stream 在内部将消费者端点的路由器键设置为目标名称 (exchange 名称) 本身,或者在静态分区的情况下基于 partition 标头的路由。

我认为this github 问题可能与您的情况有关。

【讨论】:

所以您的意思是,使用 Spring 云流,我无法将消费者绑定到具有路由键的特定预定义(非匿名)队列? 我刚刚测试过,我们添加了第二个绑定;我认为这是一个错误 - 如果队列已经存在,我们不应该添加泛型(# 通配符绑定)。作为一种变通方法,您可以使用@RabbitListener 而不是@StreamListener(除非您依赖流侦听器进行转换)。我打开了and issue for this。【参考方案4】:

鼓励在 consumer 下使用此属性以使 rabbit 能够从现有队列中消费。请注意,队列名称将仅从组属性中选择,而不是从目标中选择。

queueNameGroupOnly: 真

例子:

cloud:
stream:
  # rabbit setting: https://github.com/spring-cloud/spring-cloud-stream-binder-rabbit
  rabbit:
    bindings:
      input:
        consumer:
          acknowledgeMode: AUTO
          bindingRoutingKey: DECISION_PERSISTENCE_KEY
          declareExchange: false
          bindQueue: false
          queueNameGroupOnly: true
          consumerTagPrefix: dpa-rabbit-consumer
  bindings:
    input:
      binder: rabbit
      group: DECISION_PERSISTENCE_QUEUE
      content-type: application/json

【讨论】:

以上是关于使用 Spring Cloud Stream 将 RabbitMQ 消费者绑定到现有队列的主要内容,如果未能解决你的问题,请参考以下文章

将Kafka Streams代码迁移到Spring Cloud Stream吗?

Spring Cloud Stream RabbitMQ

Spring 集成和 Spring Cloud Stream

Spring cloud stream消息分区

Spring Cloud Stream Kafka 消费者模式

spring-cloud-stream 请求-回复消息模式