Symfony Messenger / RabbitMQ 检测请求消息

Posted

技术标签:

【中文标题】Symfony Messenger / RabbitMQ 检测请求消息【英文标题】:Symfony Messenger / RabbitMQ detecting requed messages 【发布时间】:2020-03-14 00:22:57 【问题描述】:

如果消费消息失败,默认情况下消息会延迟重新排队。 有没有办法为消息添加计数器,以便我知道消息是否在最后一次尝试中?

这是期望的行为:

第一次尝试:

App\Message\Message 
  body: array:2 [
    "id" => 2
    "alias" => "some_alias",
    "attempt" => 0,
  ]

第一次重试:

App\Message\Message 
  body: array:2 [
    "id" => 2
    "alias" => "some_alias",
    "attempt" => 1,
  ]

第二次重试:

App\Message\Message 
  body: array:2 [
    "id" => 2
    "alias" => "some_alias",
    "attempt" => 2,
  ]

第三次重试:

App\Message\Message 
  body: array:2 [
    "id" => 2
    "alias" => "some_alias",
    "attempt" => 3,
  ]

【问题讨论】:

【参考方案1】:

解决方案

messenger.yaml:

...
buses:
            messenger.bus.default:
                middleware:
    # service ids that implement Symfony\Component\Messenger\Middleware
                    - 'App\Middleware\RetryMiddleware'
...

应用\中间件\重试中间件:

namespace App\MessageMiddleware;

use Symfony\Component\Messenger\Middleware\MiddlewareInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Middleware\StackInterface;
use Psr\Log\LoggerInterface;

class RetryMiddleware implements MiddlewareInterface

    public function handle(Envelope $envelope, StackInterface $stack): Envelope
    
        try 
            return $stack->next()->handle($envelope, $stack);
         catch (\Throwable $error) 
            $msg = $envelope->getMessage();
            $body = $msg->getBody();
            $body['attempt']++;
            $msg->setBody($body);

            //rethrow same error
            throw $error;
        
    

应用\消息:

namespace App\Message;


class  Message

    public $body;

    public function getBody()
    
        return $this->body;
    

    public function setBody(array $body): void
    
        $this->body = $body;
    

【讨论】:

以上是关于Symfony Messenger / RabbitMQ 检测请求消息的主要内容,如果未能解决你的问题,请参考以下文章

Symfony Messenger 使用 Apache Kafka 作为队列传输

Symfony messenger 和 mailer:如何添加 binding_key?

Symfony Messenger / RabbitMQ 中的消费者错误处理

Symfony Messenger 不会总是重启

如何在 Symfony Messenger 的中间件上禁用日志“信息”?

Symfony Messenger 如何确定应由哪个处理程序处理每种类型的消息?