Symfony Messenger / RabbitMQ 中的消费者错误处理

Posted

技术标签:

【中文标题】Symfony Messenger / RabbitMQ 中的消费者错误处理【英文标题】:Consumer Error Handling in Symfony Messenger / RabbitMQ 【发布时间】:2019-04-03 17:34:10 【问题描述】:

我正在使用新的 Symfony Messenger Component 4.1 和 RabbitMQ 3.6.10-1 从我的 Symfony 4.1 Web 应用程序中排队和异步发送电子邮件和 SMS 通知。我的 Messenger 配置 (messenger.yaml) 如下所示:

framework:
    messenger:
        transports:
            amqp: '%env(MESSENGER_TRANSPORT_DSN_NOTIFICATIONS)%'

        routing:
            'App\NotificationBundle\Entity\NotificationQueueEntry': amqp

当要发送新通知时,我会这样排队:

use Symfony\Component\Messenger\MessageBusInterface;
// ...
$notificationQueueEntry = new NotificationQueueEntry();
// [Set notification details such as recipients, subject, and message]
$this->messageBus->dispatch($notificationQueueEntry);

然后我在命令行这样启动消费者:

$ bin/console messenger:consume-messages

我已经实现了SendNotificationHandler 服务,实际交付发生在该服务中。服务配置:

App\NotificationBundle\MessageHandler\SendNotificationHandler:
    arguments:
        - '@App\NotificationBundle\Service\NotificationQueueService'
    tags: [ messenger.message_handler ]

还有班级:

class SendNotificationHandler

    public function __invoke(NotificationQueueEntry $entry): void
    
        $this->notificationQueueService->sendNotification($entry);
    

到目前为止,一切正常,通知已送达。

现在我的问题:可能会由于(临时)网络故障而无法发送电子邮件或短信。在这种情况下,我希望我的系统在指定的时间后重试交付,最多达到指定的最大重试次数。 实现这一目标的方法是什么?

我已经阅读了有关 Dead Letter Exchanges 的信息,但是,我找不到任何关于如何将其与 Symfony Messenger 组件集成的文档或示例。

【问题讨论】:

【参考方案1】:

你需要做的是告诉 RabbitMQ,消息被拒绝而不是被确认。默认情况下,Messenger 将在 AmqpReceiver 中处理此问题。如您所见,如果您在处理程序中抛出一个实现 RejectMessageExceptionInterface 的异常,该消息将自动为您拒绝。

您还可以使用自定义中间件“模拟”这种行为。我在一个小型演示应用程序中创建了类似的东西。该机制由一个中间件组成,该中间件将(序列化的)原始消息包装在新的RetryMessage 中,并通过自定义消息总线将其发送到不同的队列,用作死信交换。然后,该消息的处理程序将解包 RetryMessage(获取原始消息并对其进行反序列化)并通过默认总线传输它:

见:

RetryMessage RetryMiddleware messenger.yaml RetryMessageHandler

这是一个拒绝消息并允许您立即再次使用它的基本设置(!)。您可能希望在延迟消耗时添加其他信息,例如时间戳的标头以对此进行改进。为此,您应该考虑编写自己的接收器、中间件和/或处理程序。

【讨论】:

对于处理死信消息时有用的标头,您可以检查:rabbitmq.com/dlx.html 两个问题:1.) 当我拒绝消息时,RabbitMQ 究竟会发生什么?我如何配置它将被推送到哪个队列、在哪个时间间隔以及重新传递多少次? 2.) 当我抛出一个实现 RejectMessageExceptionInterface 的异常时,bin/console messenger:consume-messages 命令退出,因为异常被捕获然后立即再次抛出(如您在 AmqpReceiver 中看到的那样。我将如何让它保持活动状态? 1) 为了在 RabbitMQ 中将交换声明为 DLX,您可以使用 cli-tool。据我所知,路由键,即队列名称将被保留,因此您在不同的交换机下有相同的队列,但是您可以通过设置路由键来修改它,例如 x-dead-letter-routing-key (? )。 2)命令退出应该不是问题,因为无论如何您都应该经常重新启动它。您可以为此使用像 supervisord 这样的流程管理器。如果您不希望出现这种行为,则必须编写自己的中间件或调整 SendMessageMiddleware 并禁用默认值。

以上是关于Symfony Messenger / RabbitMQ 中的消费者错误处理的主要内容,如果未能解决你的问题,请参考以下文章

Symfony Messenger 使用 Apache Kafka 作为队列传输

Symfony messenger 和 mailer:如何添加 binding_key?

Symfony Messenger / RabbitMQ 中的消费者错误处理

Symfony Messenger 不会总是重启

如何在 Symfony Messenger 的中间件上禁用日志“信息”?

Symfony Messenger 如何确定应由哪个处理程序处理每种类型的消息?