Symfony Messenger / RabbitMQ 检测请求消息
Posted
技术标签:
【中文标题】Symfony Messenger / RabbitMQ 检测请求消息【英文标题】:Symfony Messenger / RabbitMQ detecting requed messages 【发布时间】:2020-03-14 00:22:57 【问题描述】:如果消费消息失败,默认情况下消息会延迟重新排队。 有没有办法为消息添加计数器,以便我知道消息是否在最后一次尝试中?
这是期望的行为:
第一次尝试:
App\Message\Message
body: array:2 [
"id" => 2
"alias" => "some_alias",
"attempt" => 0,
]
第一次重试:
App\Message\Message
body: array:2 [
"id" => 2
"alias" => "some_alias",
"attempt" => 1,
]
第二次重试:
App\Message\Message
body: array:2 [
"id" => 2
"alias" => "some_alias",
"attempt" => 2,
]
第三次重试:
App\Message\Message
body: array:2 [
"id" => 2
"alias" => "some_alias",
"attempt" => 3,
]
【问题讨论】:
【参考方案1】:解决方案
messenger.yaml:
...
buses:
messenger.bus.default:
middleware:
# service ids that implement Symfony\Component\Messenger\Middleware
- 'App\Middleware\RetryMiddleware'
...
应用\中间件\重试中间件:
namespace App\MessageMiddleware;
use Symfony\Component\Messenger\Middleware\MiddlewareInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Middleware\StackInterface;
use Psr\Log\LoggerInterface;
class RetryMiddleware implements MiddlewareInterface
public function handle(Envelope $envelope, StackInterface $stack): Envelope
try
return $stack->next()->handle($envelope, $stack);
catch (\Throwable $error)
$msg = $envelope->getMessage();
$body = $msg->getBody();
$body['attempt']++;
$msg->setBody($body);
//rethrow same error
throw $error;
应用\消息:
namespace App\Message;
class Message
public $body;
public function getBody()
return $this->body;
public function setBody(array $body): void
$this->body = $body;
【讨论】:
以上是关于Symfony Messenger / RabbitMQ 检测请求消息的主要内容,如果未能解决你的问题,请参考以下文章
Symfony Messenger 使用 Apache Kafka 作为队列传输
Symfony messenger 和 mailer:如何添加 binding_key?
Symfony Messenger / RabbitMQ 中的消费者错误处理