转 RabbitMQ 入门教程(PHP版) 使用rabbitmq-delayed-message-exchange插件实现延迟功能

Posted brady-wang

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了转 RabbitMQ 入门教程(PHP版) 使用rabbitmq-delayed-message-exchange插件实现延迟功能相关的知识,希望对你有一定的参考价值。

 

延迟任务应用场景

场景一:物联网系统经常会遇到向终端下发命令,如果命令一段时间没有应答,就需要设置成超时。

场景二:订单下单之后30分钟后,如果用户没有付钱,则系统自动取消订单。

场景三:过1分钟给新注册会员的用户,发送注册邮件等。

php 使用rabbitmq-delayed-message-exchange插件实现延迟功能

1.安装

下载后解压,并将其拷贝至(使用Linux Debian/RPM部署)rabbitmq服务器目录:/usr/local/rabbitmq/plugins中( windows安装目录\\rabbitmq_server-version\\plugins ).

2.启用插件

使用命令rabbitmq-plugins enable rabbitmq_delayed_message_exchang启用插件

rabbitmq-plugins enable rabbitmq_delayed_message_exchang

输出如下:

The following plugins have been enabled:
  rabbitmq_delayed_message_exchange

通过rabbitmq-plugins list查看已安装列表,如下:

...
[ ] rabbitmq_delayed_message_exchange 20171215-3.6.x
...

3.机制解释

安装插件后会生成新的Exchange类型x-delayed-message,该类型消息支持延迟投递机制,接收到消息后并未立即将消息投递至目标队列中,而是存储在mnesia(一个分布式数据系统)表中,检测消息延迟时间,如达到可投递时间时并将其通过x-delayed-type类型标记的交换机类型投递至目标队列。

4.php实现过程

消费者 delay_consumer2.php:

<?php

//header(\'Content-Type:text/html;charset=utf8;\');

$params = array(
    \'exchangeName\' => \'delayed_exchange_test\',
    \'queueName\' => \'delayed_queue_test\',
    \'routeKey\' => \'delayed_route_test\',
);
$connectConfig = array(
    \'host\' => \'localhost\',
    \'port\' => 5672,
    \'login\' => \'guest\',
    \'password\' => \'guest\',
    \'vhost\' => \'/\'
);

//var_dump(extension_loaded(\'amqp\'));

//exit();

try {
    $conn = new AMQPConnection($connectConfig);
    $conn->connect();
    if (!$conn->isConnected()) {
        //die(\'Conexiune esuata\');
        //TODO 记录日志
        echo \'rabbit-mq 连接错误:\', json_encode($connectConfig);
        exit();
    }
    $channel = new AMQPChannel($conn);
    if (!$channel->isConnected()) {
        // die(\'Connection through channel failed\');
        //TODO 记录日志
        echo \'rabbit-mq Connection through channel failed:\', json_encode($connectConfig);
        exit();
    }
    $exchange = new AMQPExchange($channel);
    //$exchange->setFlags(AMQP_DURABLE);//声明一个已存在的交换器的,如果不存在将抛出异常,这个一般用在consume端
    $exchange->setName($params[\'exchangeName\']);
    $exchange->setType(\'x-delayed-message\'); //x-delayed-message类型
    /*RabbitMQ常用的Exchange Type有三种:fanout、direct、topic。

      fanout:把所有发送到该Exchange的消息投递到所有与它绑定的队列中。

      direct:把消息投递到那些binding key与routing key完全匹配的队列中。

      topic:将消息路由到binding key与routing key模式匹配的队列中。*/
    $exchange->setArgument(\'x-delayed-type\',\'direct\');
    $exchange->declareExchange();

    //$channel->startTransaction();

    $queue = new AMQPQueue($channel);
    $queue->setName($params[\'queueName\']);
    $queue->setFlags(AMQP_DURABLE);
    $queue->declareQueue();

    //绑定
    $queue->bind($params[\'exchangeName\'], $params[\'routeKey\']);
} catch(Exception $e) {
    echo $e->getMessage();
    exit();
}

function callback(AMQPEnvelope $message) {
    global $queue;
    if ($message) {
        $body = $message->getBody();
        echo \'接收时间:\'.date("Y-m-d H:i:s", time()). PHP_EOL;
        echo \'接收内容:\'.$body . PHP_EOL;
        //为了防止接收端在处理消息时down掉,只有在消息处理完成后才发送ack消息
        $queue->ack($message->getDeliveryTag());
    } else {
        echo \'no message\' . PHP_EOL;
    }
}

//$queue->consume(\'callback\');  第一种消费方式,但是会阻塞,程序一直会卡在此处

//第二种消费方式,非阻塞
/*$start = time();
while(true)
{
    $message = $queue->get();
    if(!empty($message))
    {
        echo $message->getBody();
        $queue->ack($message->getDeliveryTag());    //应答,代表该消息已经消费
        $end = time();
        echo \'<br>\' . ($end - $start);
        exit();
    }
    else
    {
        //echo \'message not found\' . PHP_EOL;
    }
}*/

//注意:这里需要注意的是这个方法:$queue->consume,queue对象有两个方法可用于取消息:consume和get。前者是阻塞的,无消息时会被挂起,适合循环中使用;后者则是非阻塞的,取消息时有则取,无则返回false。
//就是说用了consume之后,会同步阻塞,该程序常驻内存,不能用nginx,apache调用。 
$action = \'2\';

if($action == \'1\'){
    $queue->consume(\'callback\');  //第一种消费方式,但是会阻塞,程序一直会卡在此处
}else{
    //第二种消费方式,非阻塞
    $start = time();
    while(true)
    {
        $message = $queue->get();
        if(!empty($message))
        {
            echo \'接收时间:\'.date("Y-m-d H:i:s", time()). PHP_EOL;
            echo \'接收内容:\'.$message->getBody().PHP_EOL;
            $queue->ack($message->getDeliveryTag());    //应答,代表该消息已经消费
            $end = time();
            echo \'运行时间:\'.($end - $start).\'秒\'.PHP_EOL;
            //exit();
        }
        else
        {
            //echo \'message not found\' . PHP_EOL;
        }
    }
}

生产者delay_publisher2.php:

<?php

//header(\'Content-Type:text/html;charset=utf-8;\');

$params = array(
    \'exchangeName\' => \'delayed_exchange_test\',
    \'queueName\' => \'delayed_queue_test\',
    \'routeKey\' => \'delayed_route_test\',
);

$connectConfig = array(
    \'host\' => \'localhost\',
    \'port\' => 5672,
    \'login\' => \'guest\',
    \'password\' => \'guest\',
    \'vhost\' => \'/\'
);

//var_dump(extension_loaded(\'amqp\')); 判断是否加载amqp扩展
//exit();
try {
    $conn = new AMQPConnection($connectConfig);
    $conn->connect();
    if (!$conn->isConnected()) {
        //die(\'Conexiune esuata\');
        //TODO 记录日志
        echo \'rabbit-mq 连接错误:\', json_encode($connectConfig);
        exit();
    }
    $channel = new AMQPChannel($conn);
    if (!$channel->isConnected()) {
        // die(\'Connection through channel failed\');
        //TODO 记录日志
        echo \'rabbit-mq Connection through channel failed:\', json_encode($connectConfig);
        exit();
    }
    $exchange = new AMQPExchange($channel);
    $exchange->setName($params[\'exchangeName\']);
    $exchange->setType(\'x-delayed-message\'); //x-delayed-message类型
    /*RabbitMQ常用的Exchange Type有三种:fanout、direct、topic。

      fanout:把所有发送到该Exchange的消息投递到所有与它绑定的队列中。

      direct:把消息投递到那些binding key与routing key完全匹配的队列中。

      topic:将消息路由到binding key与routing key模式匹配的队列中。*/
    $exchange->setArgument(\'x-delayed-type\',\'direct\');
    $exchange->declareExchange();

    //$channel->startTransaction();
    //RabbitMQ不容许声明2个相同名称、配置不同的Queue,否则报错
    $queue = new AMQPQueue($channel);
    $queue->setName($params[\'queueName\']);
    $queue->setFlags(AMQP_DURABLE);
    $queue->declareQueue();

    //绑定队列和交换机
    $queue->bind($params[\'exchangeName\'], $params[\'routeKey\']);
    //$channel->commitTransaction();
} catch(Exception $e) {

}

for($i=5;$i>0;$i--){
    //生成消息
    echo \'发送时间:\'.date("Y-m-d H:i:s", time()).PHP_EOL;
    echo \'i=\'.$i.\',延迟\'.$i.\'秒\'.PHP_EOL;
    $message = json_encode([\'order_id\'=>time(),\'i\'=>$i]);
    $exchange->publish($message, $params[\'routeKey\'], AMQP_NOPARAM, [\'headers\'=>[\'x-delay\'=> 1000*$i]]);
    sleep(2);
}
$conn->disconnect();

对于代码来讲,首先对于消费者核心代码

$exchange->setType(\'x-delayed-message\'); //x-delayed-message类型
$exchange->setArgument(\'x-delayed-type\',\'direct\');

生产者核心代码

$exchange = new AMQPExchange($channel);
$exchange->setName($params[\'exchangeName\']);
$exchange->setType(\'x-delayed-message\'); //x-delayed-message类型
$exchange->setArgument(\'x-delayed-type\',\'direct\');
$exchange->declareExchange();

使用方法:先运行delay_consumer1.php,再运行delay_publisher1.php

运行效果:

 

原文 https://www.cnblogs.com/-mrl/p/11114116.html

 
 

以上是关于转 RabbitMQ 入门教程(PHP版) 使用rabbitmq-delayed-message-exchange插件实现延迟功能的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMQ 入门教程(PHP版) 第一部分:Hello World

RabbitMQ 入门精+转

RabbitMQ教程C#版 - 发布订阅

[译]RabbitMQ教程C#版 - 发布订阅

(转)rabbitmq的web管理界面无法使用guest用户登录

php的一些书籍(转)