Symfony Messenger:是不是可以在最后一次重试时不抛出异常?

Posted

技术标签:

【中文标题】Symfony Messenger:是不是可以在最后一次重试时不抛出异常?【英文标题】:Symfony Messenger: is it possible to not throw the exception on last retry?Symfony Messenger:是否可以在最后一次重试时不抛出异常? 【发布时间】:2021-12-02 14:37:18 【问题描述】:

我们正在使用 Symfony Messenger,并且有这些传输:

framework:
    messenger:
        failure_transport: failed

        transports:
            failed:
                dsn: 'doctrine://default?queue_name=failed'
                options:
                    table_name: 'MessengerMessages'
            async:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                retry_strategy:
                    max_retries: 3
                    delay: 5000
                    multiplier: 2
                    max_delay: 0
            asyncLowPriority:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%_low_priority'
                retry_strategy:
                    max_retries: 5
                    delay: 3600000
                    multiplier: 2
                    max_delay: 0
            sync: 'sync://'

当我们向async队列发送消息,并且最后一次重试失败并出现异常时,异常被记录到MessengerMessages表中,并且异常冒泡(在我们的例子中是Sentry) .这就是我们想要的。

然而,当我们向asyncLowPriority 队列发送消息时,我们希望失败的消息发送到:

没有到达failed传输 让异常冒泡

基本上应该丢弃异常。

这可能吗?如何实现?

原因是我们使用这个队列异步下载图像,并且我们已经在命令处理程序的专用数据库表中记录了每个失败。

【问题讨论】:

【参考方案1】:

我设法通过中间件做到了这一点:


use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\HandlerFailedException;
use Symfony\Component\Messenger\Exception\UnrecoverableMessageHandlingException;
use Symfony\Component\Messenger\Middleware\MiddlewareInterface;
use Symfony\Component\Messenger\Middleware\StackInterface;
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp;
use Throwable;

final class BypassFailureTransportMiddleware implements MiddlewareInterface

    public function __construct(
        private string $transportName,
        private int $maxRetries,
    ) 
    

    public function handle(Envelope $envelope, StackInterface $stack): Envelope
    
        try 
            return $stack->next()->handle($envelope, $stack);
         catch (HandlerFailedException $exception) 
            $nestedException = $this->getNestedException($exception);

            if ($nestedException === null) 
                throw $exception;
            

            /** @var ReceivedStamp|null $receivedStamp */
            $receivedStamp = $envelope->last(ReceivedStamp::class);

            if ($receivedStamp === null || $receivedStamp->getTransportName() !== $this->transportName) 
                throw $exception;
            

            if (!$this->isLastRetry($envelope, $nestedException)) 
                throw $exception;
            

            return $envelope->with(new SentToFailureTransportStamp($receivedStamp->getTransportName()));
        
    

    private function getNestedException(HandlerFailedException $exception): ?Throwable
    
        $nestedExceptions = $exception->getNestedExceptions();

        if (count($nestedExceptions) === 1) 
            return $nestedExceptions[0];
        

        return null;
    

    private function isLastRetry(Envelope $envelope, Throwable $nestedException): bool
    
        if ($nestedException instanceof UnrecoverableMessageHandlingException) 
            return true;
        

        /** @var RedeliveryStamp|null $redeliveryStamp */
        $redeliveryStamp = $envelope->last(RedeliveryStamp::class);

        if ($redeliveryStamp === null) 
            return false;
        

        return $redeliveryStamp->getRetryCount() === $this->maxRetries;
    

必须配置传输的名称和该传输的配置max_retries

parameters:
    async_allow_failure_transport_name: 'asyncAllowFailure'
    async_allow_failure_max_retries: 5

services:
  command.bus.bypass_failure_transport_middleware:
    class: App\Infrastructure\CommandBus\Middleware\BypassFailureTransportMiddleware
    arguments:
      $transportName: '%async_allow_failure_transport_name%'
      $maxRetries: '%async_allow_failure_max_retries%'

framework:
    messenger:
        transports:
            - name: '%async_allow_failure_transport_name%'
              dsn: '...'
              retry_strategy:
                  max_retries: '%async_allow_failure_max_retries%'
                  delay: 1000
                  multiplier: 2
                  max_delay: 0

        buses:
            command.bus:
                middleware:
                  - 'command.bus.bypass_failure_transport_middleware'

【讨论】:

以上是关于Symfony Messenger:是不是可以在最后一次重试时不抛出异常?的主要内容,如果未能解决你的问题,请参考以下文章

Symfony:我如何在 Messenger 中使用远程过程调用(RPC)?

Symfony Messenger 使用 Apache Kafka 作为队列传输

Symfony messenger 和 mailer:如何添加 binding_key?

Symfony Messenger / RabbitMQ 中的消费者错误处理

Symfony Messenger 不会总是重启

如何在 Symfony Messenger 的中间件上禁用日志“信息”?