RabbitMQphp语言对RabbitMQ四种工作模式的基础实现
Posted 十万猫毛
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQphp语言对RabbitMQ四种工作模式的基础实现相关的知识,希望对你有一定的参考价值。
学习资料:
1.黑马程序员RabbitMQ全套教程
2.RabbitMQ的PHP教程之入门
目录:
1.RabbitMQ结构:
2. WORK QUEUES 工作队列 模式:
生产者代码:
<?php
/**
* "Work Queues"工作模式--生产者
*/
$conConfig = [
'host' => '',//安装了rabbitMQ的主机ip
'port' => '5672',//端口号
'login' => 'guest',//登录账号
'password' => 'guest',//登录密码
];
$e_name = "exchange_one";//交换器名称
try
//创建一个MQ连接实例并执行连接操作
$connection = new AMQPConnection($conConfig);
$connection->connect();
if (!$connection->isConnected())
echo 'rabbitmq connect fail';
die();
//根据连接,新建通道
$channel = new AMQPChannel($connection);
//定义交换器属性并声明交换器
$exchange = new AMQPExchange($channel);
$exchange->setName($e_name);//设置交换器名称
$exchange->setType( AMQP_EX_TYPE_DIRECT); //设置交换器类型
$exchange->setFlags(AMQP_DURABLE);//持久交换器,在broker重启后交换器不会消失,一般常使用该选项
$exchange->declareExchange();//声明交换器
for ($i = 0; $i < 100; $i++)
$message = '交换器消息--' . $i;
//发送消息到MQ
$state = $exchange->publish($message);
if ($state)
echo 'success' . $i . PHP_EOL;
else
echo 'fail' . PHP_EOL;
//关闭通道和连接
$channel->close();
$connection->disconnect();
catch (AMQPConnectionException $e)
var_dump($e);
exit();
消费者代码:
- 同一时间在一个终端运行此脚本就是类似helloworld模式,
- 同一时间在n个终端运行此脚本则等于有n个消费者在一起消费队列里的消息,且消费者之间为竞争关系。
<?php
/**
* "Work Queues"工作模式--消费者
*/
$conConfig = [
'host' => '',
'port' => '5672',
'login' => 'guest',
'password' => 'guest',
];
$e_name = "exchange_one";
$queueName = "queues_1";
try
//创建连接
$connection = new AMQPConnection($conConfig);
$connection->connect();
if (!$connection->isConnected())
echo 'rabbitmq connect fail';
die();
//新建通道
$channel = new AMQPChannel($connection);
//创建队列
$queue = new AMQPQueue($channel);
$queue->setName($queueName);
$queue->setFlags(AMQP_DURABLE);
$queue->declareQueue();
$queue->bind($e_name);//将队列和交换器绑定关系
var_dump("Waiting for message...");
// 消费队列中的消息
//consume函数的第二个参数如果为AMQP_AUTOACK,则当消费者接收到消息后向rabbitmq自动确认消息接收成功,
//如果第二个参数为空,则需要我们在代码中调用ack方法来确认消息接收状态(看匿名回调函数的最后一句就是)
while (TRUE)
$queue->consume(function ($envelope, $queue)
$msg = $envelope->getBody();
var_dump($msg);
$queue->ack($envelope->getDeliveryTag()); // 手动发送ACK应答
);
// 断开连接
$connection->disconnect();
catch (AMQPConnectionException $e)
var_dump($e);
exit();
运行结果:
生产者脚本运行结果:
success0
success1
success2
success3
success4
success5
success6
success7
success8
success9
消费者1监听结果:
string(22) "Waiting for message..."
string(18) "交换器消息--0"
string(18) "交换器消息--2"
string(18) "交换器消息--4"
string(18) "交换器消息--6"
string(18) "交换器消息--7"
string(18) "交换器消息--9"
消费者2监听结果:
string(22) "Waiting for message..."
string(18) "交换器消息--1"
string(18) "交换器消息--3"
string(18) "交换器消息--5"
string(18) "交换器消息--8"
3.PUBLISH/SUBSCRIBE 广播订阅模式
生产者代码:
- 改变交换器的类型为
AMQP_EX_TYPE_FANOUT
- 删除创建队列的代码
<?php
/**
* "Publish/Subscribe"工作模式--生产者
*/
$conConfig = [
'host' => '',//安装了rabbitMQ的主机ip
'port' => '5672',//端口号
'login' => 'guest',//登录账号
'password' => 'guest',//登录密码
];
$e_name = "exchange_two";//交换器名称
try
//创建一个MQ连接实例并执行连接操作
$connection = new AMQPConnection($conConfig);
$connection->connect();
if (!$connection->isConnected())
echo 'rabbitmq connect fail';
die();
//根据连接,新建通道
$channel = new AMQPChannel($connection);
//定义交换器属性并声明交换器
$exchange = new AMQPExchange($channel);
$exchange->setName($e_name);//设置交换器名称
$exchange->setType( AMQP_EX_TYPE_FANOUT); //设置交换器类型
$exchange->setFlags(AMQP_DURABLE);//持久交换器,在broker重启后交换器不会消失,一般常使用该选项
$exchange->declareExchange();//声明交换器
for ($i = 0; $i < 5; $i++)
$message = '交换器消息--' . $i;
//发送消息到MQ交换器,成功返回true,失败返回false
$state = $exchange->publish($message);
if ($state)
echo 'success' . $i . PHP_EOL;
else
echo 'fail' . PHP_EOL;
//关闭通道和连接
$channel->close();
$connection->disconnect();
catch (AMQPConnectionException $e)
var_dump($e);
exit();
消费者代码:
-
可以有一个或多个消费者,消费者们之间主要区别就是$queueName队列名称的值不一样。
-
如果同一时间,两个终端运行两个不同消费者脚本,那么两个消费者消费的消息一致。
如果同一时间,两个终端运行同一个消费者脚本,那依旧是work queues那种竞争关系。
cus1.php:
<?php
/**
* "Publish/Subscribe"工作模式--消费者
*/
$conConfig = [
'host' => '',
'port' => '5672',
'login' => 'guest',
'password' => 'guest',
];
$e_name = "exchange_two";
$queueName = "queues_2";
try
//创建连接
$connection = new AMQPConnection($conConfig);
$connection->connect();
if (!$connection->isConnected())
echo 'rabbitmq connect fail';
die();
//新建通道
$channel = new AMQPChannel($connection);
//创建队列
$queue = new AMQPQueue($channel);
$queue->setName($queueName);
$queue->setFlags(AMQP_DURABLE);
$queue->declareQueue();
$queue->bind($e_name);//将队列和交换器绑定关系
var_dump("Waiting for message...");
while (TRUE)
$queue->consume(function ($envelope, $queue)
$msg = $envelope->getBody();
var_dump($msg);
$queue->ack($envelope->getDeliveryTag()); // 手动发送ACK应答
);
// 断开连接
$connection->disconnect();
catch (AMQPConnectionException $e)
var_dump($e);
exit();
cus2.php:
<?php
/**
* "Publish/Subscribe"工作模式--消费者
*/
$conConfig = [
'host' => '',
'port' => '5672',
'login' => 'guest',
'password' => 'guest',
];
$e_name = "exchange_two";
$queueName = "queues_3";
//后面内容和cus1.php一致。。。
运行结果:
生产者脚本:
success0
success1
success2
success3
success4
消费者1/消费者2(所有消费者的监听结果都将一致):
string(22) "Waiting for message..."
string(18) "交换器消息--0"
string(18) "交换器消息--1"
string(18) "交换器消息--2"
string(18) "交换器消息--3"
string(18) "交换器消息--4"
4.ROUITING 路由模式
生产者代码:
改变交换器的类型为AMQP_EX_TYPE_DIRECT
<?php
/**
* "Routing"工作模式--生产者
*/
$conConfig = [
'host' => '',//安装了rabbitMQ的主机ip
'port' => '5672',//端口号
'login' => 'guest',//登录账号
'password' => 'guest',//登录密码
];
$e_name = "exchange_three";//交换器名称
$arr =[
'notice' =>'[notice]类消息内容1',//本消息的路由=>本消息的内容
'warning' =>'[warning]类消息内容1',
'error' =>'[error]类消息内容1'
];
try
//创建一个MQ连接实例并执行连接操作
$connection = new AMQPConnection($conConfig);
$connection->connect();
if (!$connection->isConnected())
echo 'rabbitmq connect fail';
die();
//根据连接,新建通道
$channel = new AMQPChannel($connection);
//定义交换器属性并声明交换器
$exchange = new AMQPExchange($channel);
$exchange->setName($e_name);//设置交换器名称
$exchange->setType( AMQP_EX_TYPE_DIRECT); //设置交换器类型
$exchange->setFlags(AMQP_DURABLE);//持久交换器,在broker重启后交换器不会消失,一般常使用该选项
$exchange->declareExchange();//声明交换器
foreach ($arr as $typeName=>$typeMsg)
//发送消息到MQ,为消息指定路由,成功返回true,失败返回false
$state = $exchange->publish($typeMsg,$typeName);
if ($state)
echo 'success:' . $typeName . PHP_EOL;
else
echo 'fail:' . PHP_EOL;
//关闭通道和连接
$channel->close();
$connection->disconnect();
catch (AMQPConnectionException $e)
var_dump($e);
exit();
消费者代码:
cus1.php:
如果消息的指定路由key为notice
和warning
,则交换器会分发到此队列queues_4
中
<?php
/**
* "Routing"工作模式--消费者
*/
$conConfig = [
'host' => '',
'port' => '5672',
'login' => 'guest',
'password' => 'guest',
];
$e_name = "exchange_three";
$queueName = "queues_4";
$routingKey1 = "notice";
$routingKey2 = "warning";
try
//创建连接
$connection = new AMQPConnection($conConfig);
$connection->connect();
if (!$connection->isConnected())
echo 'rabbitmq connect fail';
die();
//新建通道
$channel = new AMQPChannel($connection);
//创建队列
$queue = new AMQPQueue($channel);
$queue->setName($queueName);
$queue->setFlags(AMQP_DURABLE);
$queue->declareQueue();
$queue->bind($e_name,$routingKey1);
$queue->bind($e_name,$routingKey2);
var_dump("Waiting for message...");
while (TRUE)
$queue->consume(function ($envelope, $queue)
$msg = $envelope->getBody();
var_dump($msg);
$queue->ack($envelope->getDeliveryTag()); // 手动发送ACK应答
);
// 断开连接
$connection->disconnect();
catch (AMQPConnectionException $e)
var_dump($e);
exit();
cus2.php:
如果消息的指定路由key为notice
和warning
,则交换器会
以上是关于RabbitMQphp语言对RabbitMQ四种工作模式的基础实现的主要内容,如果未能解决你的问题,请参考以下文章