Spring Cloud Function - 不同消费者的单独路由表达式
Posted
技术标签:
【中文标题】Spring Cloud Function - 不同消费者的单独路由表达式【英文标题】:Spring Cloud Function - Separate routing-expression for different Consumer 【发布时间】:2020-07-22 22:15:31 【问题描述】:我有一个服务,它从不同的消息队列接收不同的结构化消息。有了@StreamListener conditions
,我们可以在每种消息类型中选择应该如何处理该消息。举个例子:
我们收到两种不同类型的消息,它们具有不同的标头字段和值,例如
从“订单”队列传入:
Order1: Header: catalog:groceries
Order2: Header: catalog:tools
从“装运”队列传入:
Shipment1: Header: region:Europe
Shipment2: Header: region:America
每个队列都有一个绑定,根据@StreamListener
,我可以按目录和区域不同地处理消息
例如
@StreamListener(target = OrderSink.ORDER_CHANNEL, condition = "headers['catalog'] == 'groceries'")
public void onGroceriesOrder(GroceryOder order)
...
所以问题是,如何使用新的 Spring Cloud Function 方法实现这一点?
在文档https://cloud.spring.io/spring-cloud-static/spring-cloud-stream/3.0.2.RELEASE/reference/html/spring-cloud-stream.html#_event_routing 中提到:
Also, for SpEL, the root object of the evaluation context is Message so you can do evaluation on individual headers (or message) as well ….routing-expression=headers['type']
是否可以将路由表达式添加到绑定中,例如(application.yml
)
onGroceriesOrder-in-0:
destination: order
routing-expression: "headers['catalog']==groceries"
?
第一次回答后编辑 如果这个位置的上述表达式是不可能的,那么第一个答案意味着什么,而不是我的问题如下:
据我了解,routing-expression: headers['catalog']
之类的表达式必须全局设置,因为结果映射到某些(消费者)函数。
如何控制每个队列上的 2 条不同消息将被转发到它们自己的消费者函数,例如
Order1 --> MyOrderService.onGroceriesOrder()
Order2 --> MyOrderService.onToolsOrder()
Shipment1 --> MyShipmentService.onEuropeShipment()
Shipment2 --> MyShipmentService.onAmericaShipment()
使用@StreamListener
很容易,因为每个方法在不同的条件下都有自己的@StreamListener
注释。使用新的routing-expression
设置如何实现这一点?
?
【问题讨论】:
嗨@Danny,你找到实现它的方法了吗? 【参考方案1】:除了上面不是一个有效的表达式之外,我认为你的意思是headers['catalog']==groceries
。如果是这样,您希望从评估它作为唯一的两个选项会发生什么可能是真/假。无论如何,这些都是修辞,但有助于理解问题以及如何解决它。
表达式必须产生一个函数值才能路由到 TO。所以。 . .
routing-expression: headers['catalog']
- 假定 catalog
标头的实际值是要调用的函数的名称
routing-expression: headers['catalog']==groceries ? 'processGroceries' : 'processOther'
- 将值 'groceries' 映射到 'processGroceries' 函数。
【讨论】:
谢谢,这导致了一个后续问题,请参阅“第一次回答后编辑”块 是的,要获得与我们在 StreamListener(以及更多)中的“条件”等效的内容,您可以使用routing-expression
作为消息头。有关示例,请参阅此测试 - github.com/spring-cloud/spring-cloud-stream/blob/master/…
这没什么用,因为您必须在发送(创建)消息时定义此标头。因此,您需要在消息中嵌入函数名称(架构详细信息)。如果消息来自外部系统,它对我系统的体系结构一无所知,所以它不知道要设置哪个标头。您是否建议根据标题中的内容制作函数名称?消息传递的全部力量是解耦,这在您的解决方案中不会发生。您将消息与消费者函数名称联系起来
我从未说过或建议过您刚才所说的任何内容。该表达式允许您评估包括消息头在内的任何内容,并在此基础上确定函数定义。就像在提供的示例函数echo
中一样,如果消息的内容类型是text/plain
,将被调用。它确实与遗留注释方法做同样的事情,同时保持你的实际代码与基础设施细节无关。那你指的是什么耦合???以上是关于Spring Cloud Function - 不同消费者的单独路由表达式的主要内容,如果未能解决你的问题,请参考以下文章
如何使用 Spring Cloud Function 公开多个函数端点?
如何在 Google Cloud Function 上的 Spring Cloud 函数中获取 Pub/Sub 事件的元数据
spring cloud function 函数接口返回成功/失败处理
从 Spring Cloud Function 访问 AWS Lambda 上下文