RabbitMQphp语言对RabbitMQ四种工作模式的基础实现

Posted 十万猫毛

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQphp语言对RabbitMQ四种工作模式的基础实现相关的知识,希望对你有一定的参考价值。

学习资料:
1.黑马程序员RabbitMQ全套教程
2.RabbitMQ的PHP教程之入门

目录:

1.RabbitMQ结构:

2. WORK QUEUES 工作队列 模式:

生产者代码:

<?php
/**
 * "Work Queues"工作模式--生产者
 */
$conConfig = [
    'host' => '',//安装了rabbitMQ的主机ip
    'port' => '5672',//端口号
    'login' => 'guest',//登录账号
    'password' => 'guest',//登录密码
];

$e_name = "exchange_one";//交换器名称

try 
    //创建一个MQ连接实例并执行连接操作
    $connection = new AMQPConnection($conConfig);
    $connection->connect();
    if (!$connection->isConnected()) 
        echo 'rabbitmq connect fail';
        die();
    

    //根据连接,新建通道
    $channel = new AMQPChannel($connection);

    //定义交换器属性并声明交换器
    $exchange = new AMQPExchange($channel);
    $exchange->setName($e_name);//设置交换器名称
    $exchange->setType( AMQP_EX_TYPE_DIRECT); //设置交换器类型
    $exchange->setFlags(AMQP_DURABLE);//持久交换器,在broker重启后交换器不会消失,一般常使用该选项
    $exchange->declareExchange();//声明交换器

    for ($i = 0; $i < 100; $i++) 
        $message = '交换器消息--' . $i;

        //发送消息到MQ
        $state = $exchange->publish($message);

        if ($state) 
            echo 'success' . $i . PHP_EOL;
         else 
            echo 'fail' . PHP_EOL;
        
    
    //关闭通道和连接
    $channel->close();
    $connection->disconnect();

 catch (AMQPConnectionException $e) 
    var_dump($e);
    exit();


消费者代码:

  1. 同一时间在一个终端运行此脚本就是类似helloworld模式,
  2. 同一时间在n个终端运行此脚本则等于有n个消费者在一起消费队列里的消息,且消费者之间为竞争关系。
<?php
/**
 * "Work Queues"工作模式--消费者
 */

$conConfig = [
    'host' => '',
    'port' => '5672',
    'login' => 'guest',
    'password' => 'guest',
];

$e_name = "exchange_one";
$queueName = "queues_1";

try 
    //创建连接
    $connection = new AMQPConnection($conConfig);
    $connection->connect();
    if (!$connection->isConnected()) 
        echo 'rabbitmq connect fail';
        die();
    
    //新建通道
    $channel = new AMQPChannel($connection);

    //创建队列
    $queue = new AMQPQueue($channel);
    $queue->setName($queueName);
    $queue->setFlags(AMQP_DURABLE);
    $queue->declareQueue();
    $queue->bind($e_name);//将队列和交换器绑定关系


    var_dump("Waiting for message...");

    // 消费队列中的消息
    //consume函数的第二个参数如果为AMQP_AUTOACK,则当消费者接收到消息后向rabbitmq自动确认消息接收成功,
    //如果第二个参数为空,则需要我们在代码中调用ack方法来确认消息接收状态(看匿名回调函数的最后一句就是)
    while (TRUE) 
        $queue->consume(function ($envelope, $queue) 
            $msg = $envelope->getBody();
            var_dump($msg);
            $queue->ack($envelope->getDeliveryTag()); // 手动发送ACK应答
        );
    

    // 断开连接
    $connection->disconnect();

 catch (AMQPConnectionException $e) 
    var_dump($e);
    exit();

运行结果:

生产者脚本运行结果:

success0
success1
success2
success3
success4
success5
success6
success7
success8
success9

消费者1监听结果:

string(22) "Waiting for message..."
string(18) "交换器消息--0"
string(18) "交换器消息--2"
string(18) "交换器消息--4"
string(18) "交换器消息--6"
string(18) "交换器消息--7"
string(18) "交换器消息--9"

消费者2监听结果:

string(22) "Waiting for message..."
string(18) "交换器消息--1"
string(18) "交换器消息--3"
string(18) "交换器消息--5"
string(18) "交换器消息--8"

3.PUBLISH/SUBSCRIBE 广播订阅模式

生产者代码:

  1. 改变交换器的类型为AMQP_EX_TYPE_FANOUT
  2. 删除创建队列的代码
<?php
/**
 * "Publish/Subscribe"工作模式--生产者
 */
$conConfig = [
    'host' => '',//安装了rabbitMQ的主机ip
    'port' => '5672',//端口号
    'login' => 'guest',//登录账号
    'password' => 'guest',//登录密码
];

$e_name = "exchange_two";//交换器名称

try 
    //创建一个MQ连接实例并执行连接操作
    $connection = new AMQPConnection($conConfig);
    $connection->connect();
    if (!$connection->isConnected()) 
        echo 'rabbitmq connect fail';
        die();
    

    //根据连接,新建通道
    $channel = new AMQPChannel($connection);

    //定义交换器属性并声明交换器
    $exchange = new AMQPExchange($channel);
    $exchange->setName($e_name);//设置交换器名称
    $exchange->setType( AMQP_EX_TYPE_FANOUT); //设置交换器类型
    $exchange->setFlags(AMQP_DURABLE);//持久交换器,在broker重启后交换器不会消失,一般常使用该选项
    $exchange->declareExchange();//声明交换器

    for ($i = 0; $i < 5; $i++) 
        $message = '交换器消息--' . $i;

        //发送消息到MQ交换器,成功返回true,失败返回false
        $state = $exchange->publish($message);

        if ($state) 
            echo 'success' . $i . PHP_EOL;
         else 
            echo 'fail' . PHP_EOL;
        
    
    //关闭通道和连接
    $channel->close();
    $connection->disconnect();

 catch (AMQPConnectionException $e) 
    var_dump($e);
    exit();

消费者代码:

  1. 可以有一个或多个消费者,消费者们之间主要区别就是$queueName队列名称的值不一样。

  2. 如果同一时间,两个终端运行两个不同消费者脚本,那么两个消费者消费的消息一致。

    如果同一时间,两个终端运行同一个消费者脚本,那依旧是work queues那种竞争关系。

cus1.php:

<?php
/**
 * "Publish/Subscribe"工作模式--消费者
 */

$conConfig = [
    'host' => '',
    'port' => '5672',
    'login' => 'guest',
    'password' => 'guest',
];

$e_name = "exchange_two";
$queueName = "queues_2";

try 
    //创建连接
    $connection = new AMQPConnection($conConfig);
    $connection->connect();
    if (!$connection->isConnected()) 
        echo 'rabbitmq connect fail';
        die();
    
    //新建通道
    $channel = new AMQPChannel($connection);

    //创建队列
    $queue = new AMQPQueue($channel);
    $queue->setName($queueName);
    $queue->setFlags(AMQP_DURABLE);
    $queue->declareQueue();
    $queue->bind($e_name);//将队列和交换器绑定关系


    var_dump("Waiting for message...");

    while (TRUE) 
        $queue->consume(function ($envelope, $queue) 
            $msg = $envelope->getBody();
            var_dump($msg);
            $queue->ack($envelope->getDeliveryTag()); // 手动发送ACK应答
        );
    

    // 断开连接
    $connection->disconnect();

 catch (AMQPConnectionException $e) 
    var_dump($e);
    exit();

cus2.php:

<?php
/**
 * "Publish/Subscribe"工作模式--消费者
 */

$conConfig = [
    'host' => '',
    'port' => '5672',
    'login' => 'guest',
    'password' => 'guest',
];

$e_name = "exchange_two";
$queueName = "queues_3";
//后面内容和cus1.php一致。。。

运行结果:

生产者脚本:

success0
success1
success2
success3
success4

消费者1/消费者2(所有消费者的监听结果都将一致):

string(22) "Waiting for message..."
string(18) "交换器消息--0"
string(18) "交换器消息--1"
string(18) "交换器消息--2"
string(18) "交换器消息--3"
string(18) "交换器消息--4"

4.ROUITING 路由模式

生产者代码:

改变交换器的类型为AMQP_EX_TYPE_DIRECT

<?php
/**
 * "Routing"工作模式--生产者
 */
$conConfig = [
    'host' => '',//安装了rabbitMQ的主机ip
    'port' => '5672',//端口号
    'login' => 'guest',//登录账号
    'password' => 'guest',//登录密码
];

$e_name = "exchange_three";//交换器名称
$arr =[
    'notice' =>'[notice]类消息内容1',//本消息的路由=>本消息的内容
    'warning' =>'[warning]类消息内容1',
    'error' =>'[error]类消息内容1'
];


try 
    //创建一个MQ连接实例并执行连接操作
    $connection = new AMQPConnection($conConfig);
    $connection->connect();
    if (!$connection->isConnected()) 
        echo 'rabbitmq connect fail';
        die();
    

    //根据连接,新建通道
    $channel = new AMQPChannel($connection);

    //定义交换器属性并声明交换器
    $exchange = new AMQPExchange($channel);
    $exchange->setName($e_name);//设置交换器名称
    $exchange->setType( AMQP_EX_TYPE_DIRECT); //设置交换器类型
    $exchange->setFlags(AMQP_DURABLE);//持久交换器,在broker重启后交换器不会消失,一般常使用该选项
    $exchange->declareExchange();//声明交换器

    foreach ($arr as $typeName=>$typeMsg) 

        //发送消息到MQ,为消息指定路由,成功返回true,失败返回false
        $state = $exchange->publish($typeMsg,$typeName);

        if ($state) 
            echo 'success:' . $typeName . PHP_EOL;
         else 
            echo 'fail:' . PHP_EOL;
        
    
    //关闭通道和连接
    $channel->close();
    $connection->disconnect();

 catch (AMQPConnectionException $e) 
    var_dump($e);
    exit();

消费者代码:

cus1.php:

如果消息的指定路由key为noticewarning,则交换器会分发到此队列queues_4

<?php
/**
 * "Routing"工作模式--消费者
 */
$conConfig = [
    'host' => '',
    'port' => '5672',
    'login' => 'guest',
    'password' => 'guest',
];

$e_name = "exchange_three";
$queueName = "queues_4";
$routingKey1 = "notice";
$routingKey2 = "warning";

try 
    //创建连接
    $connection = new AMQPConnection($conConfig);
    $connection->connect();
    if (!$connection->isConnected()) 
        echo 'rabbitmq connect fail';
        die();
    
    //新建通道
    $channel = new AMQPChannel($connection);

    //创建队列
    $queue = new AMQPQueue($channel);
    $queue->setName($queueName);
    $queue->setFlags(AMQP_DURABLE);
    $queue->declareQueue();
    $queue->bind($e_name,$routingKey1);
    $queue->bind($e_name,$routingKey2);
    var_dump("Waiting for message...");

    while (TRUE) 
        $queue->consume(function ($envelope, $queue) 
            $msg = $envelope->getBody();
            var_dump($msg);
            $queue->ack($envelope->getDeliveryTag()); // 手动发送ACK应答
        );
    

    // 断开连接
    $connection->disconnect();

 catch (AMQPConnectionException $e) 
    var_dump($e);
    exit();

cus2.php:

如果消息的指定路由key为noticewarning,则交换器会

以上是关于RabbitMQphp语言对RabbitMQ四种工作模式的基础实现的主要内容,如果未能解决你的问题,请参考以下文章

四种途径提高RabbitMQ传输数据的可靠性

RabbitMQ四种交换机类型介绍

RabbitMQ快速上手以及RabbitMQ交换机的四种模式

RabbitMQ四种集群架构

rabbitmq交换器的四种模式

rabbitmq交换器的四种模式