Spring Cloud Stream RabbitMQ
Posted
技术标签:
【中文标题】Spring Cloud Stream RabbitMQ【英文标题】: 【发布时间】:2018-01-24 05:05:42 【问题描述】:我试图理解为什么我想将 Spring 云流与 RabbitMQ 一起使用。我看过 RabbitMQ Spring 教程 4 (https://www.rabbitmq.com/tutorials/tutorial-four-spring-amqp.html),这基本上是我想做的。它创建一个带有 2 个附加队列的直接交换,并根据路由键将消息路由到 Q1 或 Q2。
如果您查看教程,整个过程非常简单,创建所有部分,将它们绑定在一起,然后就可以开始了。
我想知道使用 Sing Cloud Stream 会获得什么好处,如果这甚至是它的用例。创建一个简单的交换很容易,甚至定义目的地和组也很容易使用流。所以我想为什么不更进一步,尝试用流处理教程案例。
我看到 Stream 有一个 BinderAwareChannelResolver
似乎做同样的事情。但是我正在努力将它们放在一起以实现与 RabbitMQ Spring 教程中相同的效果。我不确定这是否是一个依赖问题,但我似乎从根本上误解了一些东西,我想是这样的:
spring.cloud.stream.bindings.output.destination=myDestination
spring.cloud.stream.bindings.output.group=consumerGroup
spring.cloud.stream.rabbit.bindings.output.producer.routing-key-expression='key'
应该是诀窍。
有没有人有一个源和接收器的最小示例,它基本上创建一个直接交换,将 2 个队列绑定到它,并取决于将关键路由路由到这两个队列中的任何一个,例如https://www.rabbitmq.com/tutorials/tutorial-four-spring-amqp.html?
编辑:
下面是一组最小的代码,它演示了如何按照我的要求进行操作。我没有附上build.gradle
,因为它是直截了当的(但如果有人感兴趣,请告诉我)
application.properties
:设置生产者
spring.cloud.stream.bindings.output.destination=tut.direct
spring.cloud.stream.rabbit.bindings.output.producer.exchangeType=direct
spring.cloud.stream.rabbit.bindings.output.producer.routing-key-expression=headers.type
Sources.class
: 设置生产者频道
public interface Sources
String OUTPUT = "output";
@Output(Sources.OUTPUT)
MessageChannel output();
StatusController.class
:使用特定路由键响应休息呼叫并发送消息
/**
* Status endpoint for the health-check service.
*/
@RestController
@EnableBinding(Sources.class)
public class StatusController
private int index;
private int count;
private final String[] keys = "orange", "black", "green";
private Sources sources;
private StatusService status;
@Autowired
public StatusController(Sources sources, StatusService status)
this.sources = sources;
this.status = status;
/**
* Service available, service returns "OK"'.
* @return The Status of the service.
*/
@RequestMapping("/status")
public String status()
String status = this.status.getStatus();
StringBuilder builder = new StringBuilder("Hello to ");
if (++this.index == 3)
this.index = 0;
String key = keys[this.index];
builder.append(key).append(' ');
builder.append(Integer.toString(++this.count));
String payload = builder.toString();
log.info(payload);
// add kv pair - routingkeyexpression (which matches 'type') will then evaluate
// and add the value as routing key
Message<String> msg = new GenericMessage<>(payload, Collections.singletonMap("type", key));
sources.output().send(msg);
// return rest call
return status;
消费者方面的东西,属性:
spring.cloud.stream.bindings.input.destination=tut.direct
spring.cloud.stream.rabbit.bindings.input.consumer.exchangeType=direct
spring.cloud.stream.rabbit.bindings.input.consumer.bindingRoutingKey=orange
spring.cloud.stream.bindings.inputer.destination=tut.direct
spring.cloud.stream.rabbit.bindings.inputer.consumer.exchangeType=direct
spring.cloud.stream.rabbit.bindings.inputer.consumer.bindingRoutingKey=black
Sinks.class
:
public interface Sinks
String INPUT = "input";
@Input(Sinks.INPUT)
SubscribableChannel input();
String INPUTER = "inputer";
@Input(Sinks.INPUTER)
SubscribableChannel inputer();
ReceiveStatus.class
:接收状态:
@EnableBinding(Sinks.class)
public class ReceiveStatus
@StreamListener(Sinks.INPUT)
public void receiveStatusOrange(String msg)
log.info("I received a message. It was orange number: ", msg);
@StreamListener(Sinks.INPUTER)
public void receiveStatusBlack(String msg)
log.info("I received a message. It was black number: ", msg);
【问题讨论】:
【参考方案1】:Spring Cloud Stream 允许您使用 Spring Cloud Stream Binder 实现(Kafka、RabbitMQ、JMS 绑定器等)使应用程序连接(通过@EnableBinding
)到外部消息传递系统,从而开发事件驱动的微服务应用程序。显然,Spring Cloud Stream 使用 Spring AMQP 来实现 RabbitMQ binder。
BinderAwareChannelResolver
适用于对生产者的动态绑定支持,我认为在您的情况下,它是关于配置交换并将消费者绑定到该交换。
例如,您需要有 2 个根据您的条件设置了适当的 bindingRoutingKey
的消费者和一个具有您上面提到的属性(路由键表达式、目的地)的单个生产者(组除外)。我注意到您已经为出站通道配置了group
。 group
属性仅适用于消费者(因此是入站)。
您可能还想查看这个:https://github.com/spring-cloud/spring-cloud-stream-binder-rabbit/issues/57,因为我看到一些关于使用 routing-key-expression
的讨论。具体来说,检查this 一项关于使用表达式值。
【讨论】:
感谢您的回答。我已经看过提到的问题,它们是我实际提出这个 *** 问题的原因。对于其他人来说,一个人必须做什么似乎很清楚,但对我来说却不是。您对BinderAwareChannelResolver
的解释验证了我的理解,即我到达了正确的角落:)。但是,我尝试设置路由键表达式,但它不起作用。似乎是 gradle 的依赖问题,但我没有让它工作。所以这就是为什么我要一个示例项目。
似乎我让消费者端通过:spring.cloud.stream.bindings.input.destination=tut.direct spring.cloud.stream.rabbit.bindings.input.consumer.exchangeType=direct spring.cloud.stream.rabbit.bindings.input.consumer.bindingRoutingKey=orange spring.cloud.stream.bindings.inputer.destination=tut.direct spring.cloud.stream.rabbit.bindings.inputer.consumer.exchangeType=direct spring.cloud.stream.rabbit.bindings.inputer.consumer.bindingRoutingKey=black
只在生产者端动态设置路由键。以上是关于Spring Cloud Stream RabbitMQ的主要内容,如果未能解决你的问题,请参考以下文章
SpringCloud stream连接RabbitMQ收发信息
spring cloud-stream 和 spring cloud-bus 有啥区别?