RabbitMQ RPC:排他队列锁定 @ PHP

Posted

技术标签:

【中文标题】RabbitMQ RPC:排他队列锁定 @ PHP【英文标题】:RabbitMQ RPC: Exclusive queues locking @ PHP 【发布时间】:2012-06-19 15:12:19 【问题描述】:

我正在尝试使用类似于此示例的 RabbitMQ 在 php 上构建 RPC 服务:http://www.rabbitmq.com/tutorials/tutorial-six-java.html 我正在使用这个 PECL 扩展:http://pecl.php.net/package/amqp(版本 1.0.3)

问题是当我向服务器添加标志 AMQP_EXCLUSIVE 时,我的回调队列(在客户端脚本中声明)锁定

这是我的服务器

// connect to server
$cnn = new AMQPConnection('...');
$cnn->connect();
$channel = new AMQPChannel($cnn);
// create exchange
$exchangeName = 'k-exchange';
$exchange = new AMQPExchange($channel);
$exchange->setName($exchangeName);
$exchange->setType(AMQP_EX_TYPE_DIRECT);
$exchange->declare();

// declare queue to consume messages from
$queue = new \AMQPQueue($channel);
$queue->setName('tempQueue');
$queue->declare();

// start consuming messages
$queue->consume(function($envelope, $queue)
    use ($channel, $exchange) 

    // create callback queue
    $callbackQueue = new \AMQPQueue($channel);
    $callbackQueue->setName($envelope->getReplyTo());
    $callbackQueue->setFlags(AMQP_EXCLUSIVE); // set EXCLUSIVE flag

    /* WARNING: Following code line causes error. See rabbit logs below:
     *  connection <0.1224.10>, channel 1 - error:
     *  amqp_error,resource_locked,
     *  "cannot obtain exclusive access to locked queue 'amq.gen-Q6J...' in vhost '/'",
     *  'queue.bind'
     */
    $callbackQueue->bind($exchange->getName(), 'rpc_reply');

    // trying to publish response back to client's callback queue
    $exchange->publish(
        json_encode(array('processed by remote service!')),
        'rpc_reply',
        AMQP_MANDATORY & AMQP_IMMEDIATE
    );

    $queue->ack($envelope->getDeliveryTag());
);

这是我的 Client.php

// connect to server
$cnn = new AMQPConnection('...');
$cnn->connect();
$channel = new AMQPChannel($cnn);
// create exchange
$exchangeName = 'k-exchange';
$exchange = new AMQPExchange($channel);
$exchange->setName($exchangeName);
$exchange->setType(AMQP_EX_TYPE_DIRECT);
$exchange->declare();

// create a queue which we send messages to server via
$queue = new \AMQPQueue($channel);
$queue->setName('tempQueue');
$queue->declare();

// binding exchange to queue
$queue->bind($exchangeName, 'temp_action');

// create correlation_id
$correlationId = sha1(time() . rand(0, 1000000));

// create anonymous callback queue to get server response response via
$callbackQueue = new \AMQPQueue($channel);
$callbackQueue->setFlags(AMQP_EXCLUSIVE); // set EXCLUSIVE flag
$callbackQueue->declare();

// publishing message to exchange (passing it to server)
$exchange->publish(
    json_encode(array('process me!')),
    'temp_action',
    AMQP_MANDATORY,
    array(
        'reply_to' => $callbackQueue->getName(), // pass callback queue name
        'correlation_id' => $correlationId
    )
);

// going to wait for remote service complete tasks. tick once a second
$attempts = 0;
while ($attempts < 5)

    echo 'Attempt ' . $attempts . PHP_EOL;
    $envelope = $callbackQueue->get();
    if ($envelope) 
        echo 'Got response! ';
        print_r($envelope->getBody());
        echo PHP_EOL;
        exit;
    

    sleep(1);
    $attempts++;

所以最后我只是在 RabbitMQ 的日志中看到错误:

connection <0.1224.10>, channel 1 - error:
amqp_error,resource_locked,
    "cannot obtain exclusive access to locked queue 'amq.gen-Q6J...' in vhost '/'", 
    'queue.bind'

问题: 在 Server.php 中创建 callbackQueue 对象的正确方法是什么? 看来我的 Server.php 与 Client.php 到 RabbitMQ 服务器的连接不同。我应该在这里做什么? 我应该如何在 Server.php 端“共享”相同的(到 Client.php 的)连接。

更新 这里还有一些 RabbitMQ 日志

我的 Server.php 连接(ID 为:)

=INFO REPORT==== 20-Jun-2012::13:30:22 ===
    accepting AMQP connection <0.22322.27> (127.0.0.1:58457 -> 127.0.0.1:5672)

我的 Client.php 连接(ID 为:)

=INFO REPORT==== 20-Jun-2012::13:30:38 ===
    accepting AMQP connection <0.22465.27> (127.0.0.1:58458 -> 127.0.0.1:5672)

现在我看到 Server.php 导致错误:

=ERROR REPORT==== 20-Jun-2012::13:30:38 ===
    connection <0.22322.27>, channel 1 - error:
amqp_error,resource_locked,
"cannot obtain exclusive access to locked queue 'amq.gen-g6Q...' in vhost '/'",
'queue.bind'

我的假设 我怀疑由于 Client.php 和 Server.php 不与相同的 Id 共享连接,因此它们不可能都使用在 Client.php 中声明的独占队列

【问题讨论】:

【参考方案1】:

您的实施存在一些问题:

    换货声明 手动设置回复队列反对 使用临时队列 双向使用 AMQP_EXCLUSIVE

换货声明

您无需声明交换器 (AMQPExchange) 即可发布消息。在这个 RPC 示例中,您需要将其用作广播消息的一种方式(例如临时队列或临时交换)。所有通信都将直接在 QUEUE 上进行,理论上会绕过交换。

$exchange = new AMQPExchange($channel);
$exchange->publish(...);

队列和回复:

当您使用 AMQPQueue::setName() 和 AMQPQueue::declare() 时,您将绑定到具有用户定义名称的队列。如果您声明没有名称的队列,这称为临时队列。当您需要从特定路由键接收广播消息时,这很有用。为此,RabbitMQ / AMQP 会生成一个随机的临时名称。由于队列名称是为给定实例创建的,以独占方式使用信息,因此在连接关闭时将其丢弃。

当 RPC 客户端想要发布消息 (AMQPExchange::publish()) 时,它必须指定一个回复作为发布参数之一。这样,RPC 服务器在收到请求时就可以获取随机生成的名称。它使用回复名称作为 QUEUE 的名称,服务器将在该队列上回复给定的客户端。除了临时队列名称,实例还必须发送一个correlationId,以确保它收到的回复消息对于请求实例是唯一的。

客户

$exchange = new AMQPExchange($channel);

$rpcServerQueueName = 'rpc_queue';

$client_queue = new AMQPQueue($this->channel);
$client_queue->setFlags(AMQP_EXCLUSIVE);
$client_queue->declareQueue();   
$callbackQueueName = $client_queue->getName(); //e.g. amq.gen-JzTY20BRgKO-HjmUJj0wLg

//Set Publish Attributes
$corrId = uniqid();
$attributes = array(
    'correlation_id' => $corrId,
    'reply_to'       => $this->callbackQueueName
);  

$exchange->publish(
    json_encode(['request message']),
    $rpcServerQueueName,
    AMQP_NOPARAM,
    $attributes
);  

//listen for response
$callback = function(AMQPEnvelope $message, AMQPQueue $q) 
    if($message->getCorrelationId() == $this->corrId) 
        $this->response = $message->getBody();
        $q->nack($message->getDeliveryTag());
        return false; //return false to signal to consume that you're done. other wise it continues to block
       
;  

$client_queue->consume($callback);

服务器

$exchange = new AMQPExchange($channel);

$rpcServerQueueName = 'rpc_queue';


$srvr_queue = new AMQPQueue($channel);
$srvr_queue->setName($rpcServerQueueName); //intentionally declares the rpc_server queue name
$srvr_queue->declareQueue();
...
$srvr_queue->consume(function(AMQPEnvelope $message, AMQPQueue $q) use (&$exchange) 

    //publish with the exchange instance to the reply to queue
    $exchange->publish(
        json_encode(['response message']),  //reponse message
        $message->getReplyTo(),             //get the reply to queue from the message
        AMQP_NOPARAM,                       //disable all other params
        $message->getCorrelationId()        //obtain and respond with correlation id
    );

    //acknowledge receipt of the message
    $q->ack($message->getDeliveryTag());
);

AMQP_EXCLUSIVE

在这种情况下,EXCLUSIVE 仅用于每个实例的 Rpc 客户端的临时队列,以便它可以发布消息。换句话说,客户端为自己创建了一个一次性的临时队列,以专门从 RPC 服务器接收答案。这确保没有其他通道线程可以在该队列上发布。它仅对客户端及其响应者锁定。重要的是要注意 AQMP_EXCLUSIVE 不会阻止 RPC 服务器响应客户端的回复队列。 AMQP_EXCLUSIVE 与尝试发布到同一队列资源的两个单独线程(通道实例)有关。发生这种情况时,队列基本上被锁定以供后续连接。交换声明也会发生相同的行为。

@Denis:在这种情况下,您的实现在一定程度上是正确的

不好 - 不要在服务器中重新声明队列。这是客户的工作

$callbackQueue = new \AMQPQueue($channel);
$callbackQueue->setName($envelope->getReplyTo());
$callbackQueue->setFlags(AMQP_EXCLUSIVE); // set EXCLUSIVE flag 
...
$callbackQueue->bind($exchange->getName(), 'rpc_reply');

您正在尝试绑定到名为 tempQueue 的 QUEUE。但是您已经在 client.php 中创建了一个名为 tempQueue 的队列。根据先启动哪个服务,另一个会抛出错误。所以你可以删掉所有这些,只保留最后一部分:

// trying to publish response back to client's callback queue
$exchange->publish(
    json_encode(array('processed by remote service!')),
    'rpc_reply', //<--BAD Should be: $envelope->getReplyTo()
    AMQP_MANDATORY & AMQP_IMMEDIATE
);

然后通过替换来修改上面的:

'rpc_reply'

 with

 $envelope->getReplyTo()

不要在客户端声明队列名称

// create a queue which we send messages to server via
$queue = new \AMQPQueue($channel);
//$queue->setName('tempQueue'); //remove this line
//add exclusivity
$queue->setFlags(AMQP_EXCLUSIVE);
$queue->declare();

//no need for binding... we're communicating on the queue directly
//there is no one listening to 'temp_action' so this implementation will send your message into limbo
//$queue->bind($exchangeName, 'temp_action'); //remove this line

【讨论】:

【参考方案2】:

在您的服务器上,您还应该将您的队列声明为独占队列。请记住,RabbitMQ 队列应该具有相同的标志。例如,如果您声明设置为“持久”的队列,另一端也应该将队列声明为“持久”所以在您的服务器上放置一个标志$callbackQueue-&gt;setFlags(AMQP_EXCLUSIVE);,有点像您的客户端的那个。

【讨论】:

嗨,我已经在我的 Server.php 中添加了 EXCLUSIVE 标志,但我不起作用看起来服务器和客户端与 RabbitMQ 服务器的连接不同,这就是问题所在。或者我错了【参考方案3】:

我对这个问题的回答在 RabbitMQ 官方邮件列表中回复

虽然这里没有使用相同的库,但您已将官方教程移植到 PHP

https://github.com/rabbitmq/rabbitmq-tutorials/tree/master/php

您的代码中的问题是您声明了具有不同选项的队列。

正如一个回复所说,如果您将队列 A 声明为持久的,那么该队列的所有其他声明都必须是持久的。独占标志也是如此。

此外,您无需重新声明队列即可向其发布消息。作为 RPC 服务器,您假设在“reply_to”属性中发送的地址已经存在。我认为 RpcClient 有责任确保它正在等待回复的队列已经存在。

附录:

队列中的排他性意味着只有声明队列的通道才能访问它。

【讨论】:

以上是关于RabbitMQ RPC:排他队列锁定 @ PHP的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMq初探——用队列实现RPC

RabbitMQ——RabbitMQ的高级特性(TTL死信队列延迟队列优先级队列RPC)

Python-RabbitMQ消息队列实现rpc

RabbitMQ RPC 关闭最终消息的响应队列

rabbitmq学习:利用rabbitmq实现远程rpc调用

RabbitMQ消息队列RPC应用2