使用 RMQ 在 Symfony Messenger 中处理来自不同命名空间的消息

Posted

技术标签:

【中文标题】使用 RMQ 在 Symfony Messenger 中处理来自不同命名空间的消息【英文标题】:Handling messages from different namespaces in Symfony Messenger with RMQ 【发布时间】:2019-12-07 13:43:30 【问题描述】:

我正在使用微服务方法构建应用程序。对于服务之间的通信,我使用带有 RMQ 传输的 Symfony Messenger。基本上一切正常,但我所有的服务都必须在同一个命名空间中。一旦我尝试将它们分成自己的命名空间,如App\MailApp\Auth 等,Messenger 就会抱怨缺少 Event 类,因为整个命名空间是在发送到 RMQ 的消息头中提供的。有什么方法可以映射来自两个不同命名空间的事件?

例如Auth 应用程序调度事件UserRegistered 所以消息的类型为App\Auth\Event\UserRegistered。我想在我的邮件应用程序中处理该事件,但信使无法使用它,因为我的事件和处理程序位于 App\Mail 命名空间下,因此它无法在“邮件”应用程序中找到 App\Auth\Event\UserRegistered 类。

我遇到的错误示例:

In Serializer.php line 85:

  Could not decode message: Could not denormalize object of type App\Event\UserRequestedPasswordReset, no supporting normalizer found.

在这个确切的例子中,我正在从App 命名空间下的应用程序发送事件 UserRequestedPasswordReset,并且我正在尝试使用 App\Mail 命名空间下的应用程序来使用它。

我无法在文档或互联网上找到任何有用的信息。我试图在容器中将App\Event\UserRequestedPasswordReset 别名为App\Mail\Event\UserRequestedPasswordReset,但没有运气。我猜想这对 Denormalizers 来说是可行的,但在互联网上也找不到任何有用的东西。

通信本身正在工作,消息被发送到 RMQ 并在其他服务中接收。我对 RMQ 的设置是: 我有多个队列,每个服务一个。我与那些绑定的队列进行了扇出交换。每当我生成事件时,我都会将其发布以交换以将其填充到所有队列中,以便感兴趣的服务可以处理它们。

我的一项服务中的示例信使配置。除了 event 我使用 messenger 来处理 CQRS 命令和查询,所以我使用了三种不同的总线。

messenger:
        default_bus: messenger.bus.commands
        buses:
            messenger.bus.commands:
                middleware:
#                    - validation
#                    - doctrine_transaction
            messenger.bus.queries:
                middleware:
#                    - validation
            messenger.bus.events:
                default_middleware: allow_no_handlers
                middleware:
#                    - validation
        transports:
            events:
                dsn: "%env(MESSENGER_AMQP_DSN)%"
                options:
                    exchange:
                        name: ecommerce_events
                        type: fanout
                    queue:
                        name: ecommerce_auth

        routing:
            'App\Event\UserCreated': events
            'App\Event\UserModified': events
            'App\Event\UserChangedPassword': events
            'App\Event\UserRequestedPasswordReset': events

我希望将我的应用程序保留在不同的命名空间中,并且仍然能够处理来自其他服务的事件

【问题讨论】:

您好,我认为您已经充分描述了您要实现/确保的目标。但是,您说您有错误/问题,但不提供错误消息或产生该错误消息的代码。配置在某些时候可能会有所帮助,但代码和错误消息对于帮助您解决手头的特定问题是必要的(因为您的帖子中缺少该问题)。 你是对的,对不起。我已经编辑了我的原始帖子 因此,消息表明,您的对象已规范化(对象-> 数组)但无法非规范化(数组-> 对象)。在 symfony 序列化程序的文档页面上,它说“要使用 ObjectNormalizer,还必须安装 PropertyAccess 组件。”。我不太确定,您的微服务方法能走多远以及这是否相关……symfony.com/doc/current/components/serializer.html 我认为我拥有它需要的一切,因为当我有相同的命名空间(消息类型是例如App\Events\SomeEvent 并且我的事件在同一个命名空间上)时,使用消息没有问题。当我将我的一项服务移动到其他命名空间并且消息类型不匹配时会出现问题(例如,发送的消息类型为App\Auth\SomeEvent,而接收服务在App\Mail\SomeEvent 下具有相同的事件)。我的文档我可以看到,当我“手动”非规范化时,我可以为它提供我想要规范化的对象。问题是我不知道如何在 Messenger 中执行此操作。 【参考方案1】:

所以在深入研究主题后,我能够找到解决方案。

我只需要创建自定义序列化程序,然后在编码期间剥离命名空间,然后在解码期间提供类型到实际事件类的映射。这是我的代码

class EventsSerializer extends Serializer

    public function encode(Envelope $envelope): array
    
        $data = parent::encode($envelope);
        $data['headers']['type'] = $this->parseType($data['headers']['type']);

        return $data;
    

    private function parseType(string $type)
    
        return end(explode('\\', $type));
    

    public function decode(array $encodedEnvelope): Envelope
    
        $translatedType = $this->translateType($encodedEnvelope['headers']['type']);
        $encodedEnvelope['headers']['type'] = $translatedType;

        return parent::decode($encodedEnvelope);
    

    private function translateType($type)
    
        $map = [
            'UserCreated' => UserCreated::class,
            'UserRequestedPasswordReset' => UserRequestedPasswordReset::class
        ];

        return $map[$type] ?? $type;
    

在信使配置中:

framework:
  messenger:
    serializer:
      default_serializer: AppBundle\Serializer\EventsSerializer

请记住,这更像是概念证明,它可能可以增强,但它正在工作。

【讨论】:

我让我的更通用一点,但这是一个很好的基础 对于下一个人:如果您在消息中添加getMessageType,则可以在上面的parseType 逻辑中执行return $envelope->getMessage()->getMessageType()。请记住在 translateType 中创建对应项

以上是关于使用 RMQ 在 Symfony Messenger 中处理来自不同命名空间的消息的主要内容,如果未能解决你的问题,请参考以下文章

RMQ

使用二叉索引树进行 RMQ 扩展

浅谈RMQ

RMQ问题(ST算法)

RMQ 解决区间查询问题

RMQ算法