将事件发布到单个微服务实例

Posted

技术标签:

【中文标题】将事件发布到单个微服务实例【英文标题】:Publishing an Event to a Single Microservice Instance 【发布时间】:2019-12-27 10:32:27 【问题描述】:

我正在开发一个项目,该项目具有多个事件驱动的微服务,并且还使用 Kubernetes 进行负载平衡。所有的服务都是发布者和听众。当微服务发布事件时,所有侦听器都在捕获事件(如果它侦听该特定事件)并完成他们的工作。在此之前,此流程没有问题:

假设我有一个负责发送电子邮件的微服务。由于高负载,此服务被负载均衡器复制了 2 次。现在我们有 3 个电子邮件服务实例。当发布“sendMail”事件时,所有 3 个实例都在捕获该事件并为自己发送一封电子邮件。在一天结束时,正在发送 3 封电子邮件。

我的问题是,我可以配置一个云总线,允许我为这两种场景发布事件。我想对一个事件说“当一个听众抓住你时,消失”或“去找每个在外面等着的听众”。

例如;

微服务:A、B、C

被负载均衡器复制:A1、A2、A3、B1...

案例 1:我想为所有服务的实例发布一个事件。

案例 2:我想为服务 A 实例发布事件。

案例 3:我想为 A 的单个实例发布事件(不要 关心哪个)。

我试过了; 为事件提供目的地,但所有实例都具有相同的总线名称,因为它们重复相同。如果我给出/知道单个实例总线名称,我不会使用它,因为那个 pod 可能会死掉。

事件发布;

applicationContext().publishEvent(
        new customEvent(
                this,  // Source
                busProperties().getId(),  // Origin Service
                null  // Destination Service (null: all)
        )
);

事件监听器;

@Component
public class Listener implements ApplicationListener<CustomEvent> 
    @Override
    public void onApplicationEvent(CustomEvent event) 
        Foo();
    

【问题讨论】:

这个bus你用什么? KubeMQ? RabbitMQ 但我不想像 RabbitMQ 那样解决这个问题,我正在寻找框架级别的解决方案。 我明白了,您可以包含一个辅助消息确认主题,其中任何侦听器都会立即使用某个消息 ID 发布自己的侦听发生事件。虽然这是有风险的,但会有竞争条件,并且如果某些服务重新启动,它会继续读取过去的消息,您可能需要将这些读取的消息 id 存储在某个缓存中?替换所有经纪人中已经存在的功能变得非常混乱(我认为) 或者您可以将listener 模块提取为单独的single 服务,并将读取的消息提供给A、B、C也许是循环方式? 我觉得必须有一个属性/配置来指定当单个侦听器使用它们时要删除的事件。那将很容易解决我的问题。但如果我找不到它,我不想要任何混乱的解决方案。我已经可以使用自动连接的 RabbitTemplate 将事件发布为 P2P,我想我可能会使用我不想要但也不混乱的解决方案。 【参考方案1】:

我现在明白了,这是我之前发布的Image Link。我相信你已经知道了。这是一个常见问题,最好的方法是使用 redis 作为“blocking”机制。由于消息队列异步向所有消费者发送请求,我们将不知道谁在接收和处理请求,但我们通过使用阻塞来确保它不会被所有人处理。在执行任何操作之前,请检查该块,如果它不存在,则处理请求。

【讨论】:

【参考方案2】:

我相信这可以通过设置您的 consumerGroupId 来解决。您的应用程序的每个实例(电子邮件服务的每个微服务)都需要有一个唯一的 groupId,以便rabbitmq 将请求发送到任何一个可用的唯一组。

Here is how it works

【讨论】:

首先,您分享的图片不可用,您可以查看一下吗? (我不知道它可能被我公司的网络政策阻止了)。我在最后一句话中提到了这个解决方案。假设我们有 3 个 A 实例(A1、A2、A3),​​每个实例都有自己的 groupId。您说我必须通过提供特定的 groupId 来发布事件。例如 A1,但是如果 A1 服务 pod 死了会发生什么?如果 A1 死了,A2 或 A3 应该抓住事件并做必要的工作,这就是为什么我不能给出一个固定的总线名称。由于某些问题或版本更新,Pod 不断/最终死亡,重新启动。【参考方案3】:

我已经找到并实现了一个部分解决方案,将 Spring Cloud Stream 与 Spring Cloud Bus 结合使用。我将 Cloud Stream 与单个事件的自定义队列和组一起使用(第一个示例为 sentMail 事件)。当我通过 Cloud Stream 发送消息时,如果一个微服务的多个实例正在侦听该消息,它们会像这样一一接收:

发送消息:M1、M2、M3

监听A微服务实例:A1、A2

使用云总线:

A1 接收 M1、M2、M3

A2 接收 M1、M2、M3

使用 Cloud Stream(定义自定义组):

A1 接收 M1、M3

A2 接收 M2

实施:https://www.baeldung.com/spring-cloud-stream

【讨论】:

以上是关于将事件发布到单个微服务实例的主要内容,如果未能解决你的问题,请参考以下文章

微服务实战:选择微服务部署策略

Chris Richardson微服务实战系列

微服务实战:选择微服务部署策略

微服务实践:从单体式架构迁移到微服务架构

微服务实战:落地微服务架构到直销系统(将生产者与消费者接入消息总线)

微服务实战