使用 Spring Cloud Stream 将 RabbitMQ 消费者绑定到现有队列
Posted
技术标签:
【中文标题】使用 Spring Cloud Stream 将 RabbitMQ 消费者绑定到现有队列【英文标题】:Bind RabbitMQ consumer using Spring Cloud Stream to an existing queue 【发布时间】:2017-04-28 13:52:16 【问题描述】:我使用 RabbitMQ web-UI 创建了一个主题交换 TX 并绑定到交换两个队列 TX.Q1 和 TX.Q2,每个都与路由键 rk1 和 rk2 相应地绑定,并产生很少的消息到交换。
现在我想使用 Spring Cloud Stream 创建一个仅从 Q1 获取消息的消费者。 我尝试使用配置:
spring.cloud.stream.bindings.input.destination=TX
spring.cloud.stream.bindings.input.group=Q1
以及消费消息的方法的注解@StreamListner(Sink.INPUT)
。
结果我可以看到消费者创建了一个具有相同名称的队列(或绑定)TX.Q1,但新队列/绑定的 Routing-Key 是 #. 如何通过 Spring Cloud Stream 配置将使用来自预定义队列的消息的消费者(仅使用 rk1 路由的消息)。
【问题讨论】:
【参考方案1】:所以现在,Garry Russell 建议的解决方法已经为我解决了这个问题。
我以这种方式使用@RabbitListener
而不是@StreamListenet
:@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "TX.Q1", durable = "true"), exchange = @Exchange(value = "TX", type = "topic", durable = "true"), key = "rk1")
。
因此,预定义队列 TX.Q1 使用绑定密钥 rk1 绑定到交换 TX。
等待Spring Cloud Steream issue 的更新。
【讨论】:
这个解决方法已经有一段时间了,我想知道您是否提出了一个更干净的解决方案(例如,使用属性而不是注释)。【参考方案2】:我想我使用@StreamListener
找到了解决方案,而不是使用解决方法。一切都是在配置中完成的,而不是在代码中。
我使用的配置如下(在 .yml 中,但您可以在 .properties 中轻松翻译):
spring:
cloud:
stream:
bindings:
input:
binder: <binder_name>
destination: TX
group: Q1
binders:
<binder_name>:
type: rabbit
environment:
spring:
rabbitmq:
host: <host>
port: <port>
virtual-host: <vhost>
username: <username>
password: <password>
rabbit:
bindings:
input:
consumer:
binding-routing-key: rk1
exchange-name: TX
queue-name-group-only: true
bind-queue: true
exchange-durable: true
exchange-type: topic
使用这种方法,您不必编写特定代码来让 RabbitMQ 使用者连接到您的集群,这应该可以解决您的问题。
希望这会有所帮助。
【讨论】:
它没有按预期工作。例如,我确实提供了上述设置,但每次重新启动应用程序时,我注意到在 RabbitMQ 中使用路由键 # 创建了一个新绑定——即使我已经定义了另一个路由键。因此,每当我发送消息时,即使没有路由密钥 - 我不希望我的听众收到该消息 - 但是,由于也自动创建了新绑定 (#),如原始问题中所述 - 我无法发送消息特定的路由键。【参考方案3】:Spring Cloud Stream 在内部将消费者端点的路由器键设置为目标名称 (exchange
名称) 本身,或者在静态分区的情况下基于 partition
标头的路由。
我认为this github 问题可能与您的情况有关。
【讨论】:
所以您的意思是,使用 Spring 云流,我无法将消费者绑定到具有路由键的特定预定义(非匿名)队列? 我刚刚测试过,我们添加了第二个绑定;我认为这是一个错误 - 如果队列已经存在,我们不应该添加泛型(#
通配符绑定)。作为一种变通方法,您可以使用@RabbitListener
而不是@StreamListener
(除非您依赖流侦听器进行转换)。我打开了and issue for this。【参考方案4】:
鼓励在 consumer 下使用此属性以使 rabbit 能够从现有队列中消费。请注意,队列名称将仅从组属性中选择,而不是从目标中选择。
queueNameGroupOnly: 真
例子:
cloud:
stream:
# rabbit setting: https://github.com/spring-cloud/spring-cloud-stream-binder-rabbit
rabbit:
bindings:
input:
consumer:
acknowledgeMode: AUTO
bindingRoutingKey: DECISION_PERSISTENCE_KEY
declareExchange: false
bindQueue: false
queueNameGroupOnly: true
consumerTagPrefix: dpa-rabbit-consumer
bindings:
input:
binder: rabbit
group: DECISION_PERSISTENCE_QUEUE
content-type: application/json
【讨论】:
以上是关于使用 Spring Cloud Stream 将 RabbitMQ 消费者绑定到现有队列的主要内容,如果未能解决你的问题,请参考以下文章
将Kafka Streams代码迁移到Spring Cloud Stream吗?
Spring 集成和 Spring Cloud Stream