Symfony 信使队列与绑定键 - 重试策略

Posted

技术标签:

【中文标题】Symfony 信使队列与绑定键 - 重试策略【英文标题】:Symfony messenger queues with binding key - retry strategy 【发布时间】:2020-08-01 09:09:53 【问题描述】:

我正在我工作的公司中实施 messenger。我发现路由键有问题。

我想将一条消息发送到两个队列。另外两个应用程序将处理此队列。一切正常,但是当处理程序抛出异常时我发现了问题。它将消息发送一到两个重试队列,因为重试队列通过绑定键匹配,这对于这个队列是相同的。

最后重试 3 次后,我的 dlqs 上有 16 条消息。你能帮我解决这个问题吗?是否可以基于队列而不是路由键创建重试策略?

我的配置如下:

messenger:
    failure_transport: failed
    default_bus: command.bus
    transports:
        async:
            dsn: amqp://rabbitmq:rabbitmq@rabbitmq:5672
            options:
                retry_strategy:
                    max_retries: 3
                    delay: 1000
                    multiplier: 2
                    max_delay: 0
                exchange:
                    name: olimp
                    type: topic
                queues:
                    create_miniature_v1:
                        binding_keys:
                            - first
                    create_miniature_v2:
                        binding_keys:
                            - first
        failed:
            dsn: amqp://rabbitmq:rabbitmq@rabbitmq:5672
            options:
                exchange:
                    name: olimp_dead
                    type: topic
                queues:
                    create_miniature_v1_dlq:
                        binding_keys:
                            - first
                    create_miniature_v2_dlq:
                        binding_keys:
                            - first

    routing:
        'Olimp\Messenger\TestEvent': async

    buses:
        command.bus:
            middleware:
                - Olimp\Shared\Application\Message\Middleware\EventDispatcher
                - doctrine_close_connection
                - doctrine_transaction

        event.bus:
            default_middleware: allow_no_handlers

        query.bus: ~

我用这样的标记发送事件:

class MessengerTestCommand extends Command

    protected static $defaultName = 'app:messenger-test';
    private MessageBusInterface $bus;

    public function __construct(MessageBusInterface $bus)
    
        $this->bus = $bus;

        parent::__construct();
    

    protected function execute(InputInterface $input, OutputInterface $output): int
    
        $io = new SymfonyStyle($input, $output);

        $this->bus->dispatch(
            new TestEvent(), [
                new AmqpStamp('first')
            ]
        );

        $io->success('Done');

        return 0;
    

处理程序:

class TestEventHandler implements MessageHandlerInterface

    public function __invoke(TestEvent $event)
    
        dump($event->id);

        throw new \Exception('Boom');
    

我在兔子身上发现的:

现在我正在尝试这样的配置:

framework:
    messenger:
        failure_transport: failed
        default_bus: command.bus
        transports:
            async:
                dsn: amqp://rabbitmq:rabbitmq@rabbitmq:5672
                options:
                    retry_strategy:
                        max_retries: 3
                        delay: 1000
                        multiplier: 2
                        max_delay: 0
                    exchange:
                        name: olimp
                        type: topic
                    queues:
                        create_miniature_v1:
                            binding_keys:
                                - first
            async1:
                dsn: amqp://rabbitmq:rabbitmq@rabbitmq:5672
                options:
                    retry_strategy:
                        max_retries: 3
                        delay: 1000
                        multiplier: 2
                        max_delay: 0
                    exchange:
                        name: olimp
                        type: topic
                    queues:
                        create_miniature_v2:
                            binding_keys:
                                - first
            failed:
                dsn: amqp://rabbitmq:rabbitmq@rabbitmq:5672
                options:
                    exchange:
                        name: olimp_dead
                        type: topic
                    queues:
                        create_miniature_v1_dlq:
                            binding_keys:
                                - first
                        create_miniature_v2_dlq:
                            binding_keys:
                                - first

        routing:
            'Olimp\Messenger\TestEvent': [async, async1]

并使用两个正在运行的控制台命令:

bin/console messenger:consume async
bin/console messenger:consume async1

但它的工作原理是一样的。

【问题讨论】:

【参考方案1】:

好的,我自己找到答案了。

我创建了新的重试策略。我将queue_name_pattern 更改为%routing_key%_%delay% 并创建了我自己的SendFailedMessageForRetryListener。为了重试信封,我添加了邮票new AmqpStamp($envelope->last(AmqpReceivedStamp::class)->getQueueName()),它用于为延迟队列创建正确的路由密钥。因此,我不是根据交换名称创建队列,而是根据队列名称创建它。

还有两件事:

队列中的绑定键如下所示:

queues:
    create_miniature_v1:
        binding_keys:
            - create_miniature_v1
            - first
    create_miniature_v2:
        binding_keys:
            - create_miniature_v2
            - first

和失败的队列:

queues:
    create_miniature_v1_dlq:
        binding_keys:
            - create_miniature_v1
    create_miniature_v2_dlq:
        binding_keys:
            - create_miniature_v2

【讨论】:

以上是关于Symfony 信使队列与绑定键 - 重试策略的主要内容,如果未能解决你的问题,请参考以下文章

PubSub:如何设置没有指数退避的重试策略?

RabbitMQ—入门与回顾

Spring Cloud Stream消费失败后的处理策略:使用DLQ队列(RabbitMQ)

RabbitMQ学习总结

Symfony Messenger:重试延迟不适用于 Redis 传输

关于HttpClient重试策略的研究