RabbitMQ 在 PHP 下的简单使用 -- 安装 AMQP 扩展和 Direct Exchange 模式
Posted Learn By Doing
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ 在 PHP 下的简单使用 -- 安装 AMQP 扩展和 Direct Exchange 模式相关的知识,希望对你有一定的参考价值。
Windows 安装 amqp 扩展
RabbitMQ 是基于 amqp(高级消息队列协议) 协议的。使用 RabbitMQ 前必须为 php 安装相应的 amqp 扩展。
-
下载相应版本的 amqp 扩展:http://pecl.php.net/package/amqp,解压缩文件。
-
将 php_amqp.dll 复制到 php 的扩展目录 ext 下,修改配置文件 php.ini:
[amqp] extension=php_amqp.dll
-
将 rabbitmq.*.dll 文件复制到 php 的安装目录下,然后修改 Apache 配置文件 httpd.conf:
#[rabbitmq] LoadFile "F:wamp64inphpphp7.0.10 abbitmq.*.dll"
-
重启服务器,查看 phpinfo,确认扩展信息。
Direct Exchange 模式
Direct Exchange 模式的交换机适合用于消息的单播发送. 交换机根据推送消息时的 routing key 和 队列的 routing key 判断消息应该推送到哪个队列. 可以实现同一交换机上的消息, 根据 routing key 推送到不同的队列中.
默认 Direct Exchange
此种模式下,使用 RabbitMQ 的默认 Exchange 即可,默认的 Exchange 是 Direct 模式。使用默认 Exchange 时,不需要对 Exchange 进行属性设置和声明,也不需要对 Queue 进行显示绑定和设置 routing key。Queue 默认会绑定到默认 Exchange,以及默认 routing key 与 Queue 的名称相同。
producer.php:
-
创建连接并发起连接
-
在连接上创建通道
-
在通道上获取默认交换机
-
向交换机发送消息
1 header(‘Content-Type: text/html; charset=utf-8‘); 2 // 连接设置 3 $conConfig = [ 4 ‘host‘ => ‘127.0.0.1‘, 5 ‘port‘ => 5672, 6 ‘login‘ => ‘root‘, 7 ‘password‘ => ‘root‘, 8 ‘vhost‘ => ‘/‘ 9 ]; 10 try 11 { 12 // RabbitMQ 连接实例 13 $con = new AMQPConnection($conConfig); 14 // 发起连接 15 $con->connect(); 16 // 判断连接是否仍然有效 17 if(!$con->isConnected()) 18 { 19 echo ‘连接失败‘;die; 20 } 21 // 新建通道 22 $channel = new AMQPChannel($con); 23 // 使用RabbitMQ的默认Exchange 24 $exchange = new AMQPExchange($channel); 25 for($i = 1; $i < 6; $i++) 26 { 27 $message = [ 28 ‘name‘ => ‘默认交换机,消息-‘ . $i, 29 ‘info‘ => ‘Hello World!‘ 30 ]; 31 // 发送消息,为消息指定routing key,成功返回true,失败false 32 $state = $exchange->publish(json_encode($message, JSON_UNESCAPED_UNICODE), ‘test.queue1‘); 33 if($state) 34 { 35 echo ‘Success‘ . PHP_EOL; 36 }else 37 { 38 echo ‘Fail‘ . PHP_EOL; 39 } 40 } 41 // 关闭连接 42 $con->disconnect(); 43 }catch(Exception $e) 44 { 45 echo $e->getMessage(); 46 }
consumer.php:
-
创建连接并发起连接
-
在连接上创建通道
-
在通道上创建队列并声明队列
-
从队列获取消息
1 header(‘Content-Type: text/html; charset=utf-8‘); 2 $conConfig = [ 3 ‘host‘ => ‘127.0.0.1‘, 4 ‘port‘ => 5672, 5 ‘login‘ => ‘root‘, 6 ‘password‘ => ‘root‘, 7 ‘vhost‘ => ‘/‘ 8 ]; 9 10 11 try 12 { 13 $con = new AMQPConnection($conConfig); 14 $con->connect(); 15 if(!$con->isConnected()) 16 { 17 echo ‘连接失败‘;die; 18 } 19 20 21 $channel = new AMQPChannel($con); 22 23 24 $queue = new AMQPQueue($channel); 25 $queue->setName(‘test.queue1‘); 26 // 声明队列,不需要对Queue进行显示绑定到交换机和指定Queue的routing key 27 $queue->declareQueue(); 28 $queue->consume(function($envelope, $queue) 29 { 30 echo $envelope->getBody() . PHP_EOL; 31 }, AMQP_AUTOACK); 32 33 34 $con->disconnect(); 35 }catch(Exception $e) 36 { 37 echo $e->getMessage(); 38 }
自定义 Direct Exchange
producer:
header(‘Content-Type: text/html; charset=utf-8‘); // 连接设置 $conConfig = [ ‘host‘ => ‘127.0.0.1‘, ‘port‘ => 5672, ‘login‘ => ‘root‘, ‘password‘ => ‘root‘, ‘vhost‘ => ‘/‘ ]; try { // RabbitMQ 连接实例 $con = new AMQPConnection($conConfig); // 发起连接 $con->connect(); // 判断连接结果,true成功,false失败 if(!$con->isConnected()) { echo ‘连接失败‘;die; } // 新建通道 $channel = new AMQPChannel($con); // 新建交换机 $exchange = new AMQPExchange($channel); // 交换机名称 $exchange->setName(‘test.direct‘); // 交换机类型 $exchange->setType(‘direct‘); // 声明交换机 $exchange->declareExchange(); for($i = 1; $i < 6; $i++) { $message = [ ‘name‘ => ‘direct交换机,消息-‘ . $i, ‘info‘ => ‘Hello World!‘ ]; // 发送消息,同时为消息指定routing key,成功返回true,失败false $state = $exchange->publish(json_encode($message, JSON_UNESCAPED_UNICODE), ‘test.queue1‘); if($state) { echo ‘Success‘ . PHP_EOL; }else { echo ‘Fail‘ . PHP_EOL; } } // 关闭连接 $con->disconnect(); }catch(Exception $e) { echo $e->getMessage(); }
consumer:
1 header(‘Content-Type: text/html; charset=utf-8‘); 2 $conConfig = [ 3 ‘host‘ => ‘127.0.0.1‘, 4 ‘port‘ => 5672, 5 ‘login‘ => ‘root‘, 6 ‘password‘ => ‘root‘, 7 ‘vhost‘ => ‘/‘ 8 ]; 9 10 11 try 12 { 13 $con = new AMQPConnection($conConfig); 14 $con->connect(); 15 if(!$con->isConnected()) 16 { 17 echo ‘连接失败‘;die; 18 } 19 20 $channel = new AMQPChannel($con); 21 22 $exchange =new AMQPExchange($channel); 23 $exchange->setName("test.direct"); 24 $exchange->setType(‘direct‘); 25 $exchange->setFlags(AMQP_DURABLE); 26 $exchange->declareExchange(); 27 28 29 $queue = new AMQPQueue($channel); 30 $queue->setName(‘test.queue1‘); 31 // 声明队列,不需要对Queue进行显示绑定到交换机和指定Queue的routing key 32 $queue->declareQueue(); 33 // 绑定队列到指定交换机,并指定routing key,即分发规则,消息的routing key与队列的绑定routing key匹配时才。routing key可以使用正则表达式 34 $queue->bind(‘test.direct‘, ‘/^q.*/‘); 35 $queue->consume(function($envelope, $queue) 36 { 37 echo $envelope->getBody() . PHP_EOL; 38 }, AMQP_AUTOACK); 39 40 41 $con->disconnect(); 42 }catch(Exception $e) 43 { 44 echo $e->getMessage(); 45 }
以上是关于RabbitMQ 在 PHP 下的简单使用 -- 安装 AMQP 扩展和 Direct Exchange 模式的主要内容,如果未能解决你的问题,请参考以下文章