Spring Cloud Function - 不同消费者的单独路由表达式

Posted

技术标签:

【中文标题】Spring Cloud Function - 不同消费者的单独路由表达式【英文标题】:Spring Cloud Function - Separate routing-expression for different Consumer 【发布时间】:2020-07-22 22:15:31 【问题描述】:

我有一个服务,它从不同的消息队列接收不同的结构化消息。有了@StreamListener conditions,我们可以在每种消息类型中选择应该如何处理该消息。举个例子:

我们收到两种不同类型的消息,它们具有不同的标头字段和值,例如

从“订单”队列传入:

Order1:  Header: catalog:groceries 
Order2:  Header: catalog:tools 

从“装运”队列传入:

Shipment1:  Header: region:Europe 
Shipment2:  Header: region:America 

每个队列都有一个绑定,根据@StreamListener,我可以按目录和区域不同地处理消息

例如

@StreamListener(target = OrderSink.ORDER_CHANNEL, condition = "headers['catalog'] == 'groceries'")
public void onGroceriesOrder(GroceryOder order)
...

所以问题是,如何使用新的 Spring Cloud Function 方法实现这一点?

在文档https://cloud.spring.io/spring-cloud-static/spring-cloud-stream/3.0.2.RELEASE/reference/html/spring-cloud-stream.html#_event_routing 中提到:

Also, for SpEL, the root object of the evaluation context is Message so you can do evaluation on individual headers (or message) as well …​.routing-expression=headers['type']

是否可以将路由表达式添加到绑定中,例如(application.yml

onGroceriesOrder-in-0:
  destination: order
  routing-expression: "headers['catalog']==groceries"

?

第一次回答后编辑 如果这个位置的上述表达式是不可能的,那么第一个答案意味着什么,而不是我的问题如下:

据我了解,routing-expression: headers['catalog'] 之类的表达式必须全局设置,因为结果映射到某些(消费者)函数。

如何控制每个队列上的 2 条不同消息将被转发到它们自己的消费者函数,例如

Order1 --> MyOrderService.onGroceriesOrder()
Order2 --> MyOrderService.onToolsOrder()
Shipment1 --> MyShipmentService.onEuropeShipment()
Shipment2 --> MyShipmentService.onAmericaShipment()

使用@StreamListener 很容易,因为每个方法在不同的条件下都有自己的@StreamListener 注释。使用新的routing-expression 设置如何实现这一点? ?

【问题讨论】:

嗨@Danny,你找到实现它的方法了吗? 【参考方案1】:

除了上面不是一个有效的表达式之外,我认为你的意思是headers['catalog']==groceries。如果是这样,您希望从评估它作为唯一的两个选项会发生什么可能是真/假。无论如何,这些都是修辞,但有助于理解问题以及如何解决它。

表达式必须产生一个函数值才能路由到 TO。所以。 . .

routing-expression: headers['catalog'] - 假定 catalog 标头的实际值是要调用的函数的名称

routing-expression: headers['catalog']==groceries ? 'processGroceries' : 'processOther' - 将值 'groceries' 映射到 'processGroceries' 函数。

【讨论】:

谢谢,这导致了一个后续问题,请参阅“第一次回答后编辑” 是的,要获得与我们在 StreamListener(以及更多)中的“条件”等效的内容,您可以使用 routing-expression 作为消息头。有关示例,请参阅此测试 - github.com/spring-cloud/spring-cloud-stream/blob/master/… 这没什么用,因为您必须在发送(创建)消息时定义此标头。因此,您需要在消息中嵌入函数名称(架构详细信息)。如果消息来自外部系统,它对我系统的体系结构一无所知,所以它不知道要设置哪个标头。您是否建议根据标题中的内容制作函数名称?消息传递的全部力量是解耦,这在您的解决方案中不会发生。您将消息与消费者函数名称联系起来 我从未说过或建议过您刚才所说的任何内容。该表达式允许您评估包括消息头在内的任何内容,并在此基础上确定函数定义。就像在提供的示例函数echo 中一样,如果消息的内容类型是text/plain,将被调用。它确实与遗留注释方法做同样的事情,同时保持你的实际代码与基础设施细节无关。那你指的是什么耦合???

以上是关于Spring Cloud Function - 不同消费者的单独路由表达式的主要内容,如果未能解决你的问题,请参考以下文章

如何使用 Spring Cloud Function 公开多个函数端点?

如何在 Google Cloud Function 上的 Spring Cloud 函数中获取 Pub/Sub 事件的元数据

spring cloud function 函数接口返回成功/失败处理

从 Spring Cloud Function 访问 AWS Lambda 上下文

Spring Cloud Function - 不同消费者的单独路由表达式

spring cloud stream 3.1.2 源码搭配rocketmq学习