PHP使用RabbitMQ消息队列
Posted BUG工厂
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了PHP使用RabbitMQ消息队列相关的知识,希望对你有一定的参考价值。
1、安装amqp拓展 安装流程
2、下载工具包 php-amqplib
composer require php-amqplib/php-amqplib
3、代码操作如下
【消费消息】
1 <?php 2 //配置信息 3 $conn_args = array( 4 \'host\' => \'127.0.0.1\', 5 \'port\' => \'5672\', 6 \'login\' => \'zcw\', 7 \'password\' => \'123456\', 8 \'vhost\'=>\'/\' 9 ); 10 $e_name = \'exchange1\'; //交换机名 11 $q_name = \'queue1\'; //队列名 12 $k_route = \'route1\'; //路由key 13 14 //创建连接和channel 15 $conn = new AMQPConnection($conn_args); 16 if (!$conn->connect()) { 17 die("Cannot connect to the broker!\\n"); 18 } 19 $channel = new AMQPChannel($conn); 20 21 //创建交换机 22 $ex = new AMQPExchange($channel); 23 $ex->setName($e_name); 24 $ex->setType(AMQP_EX_TYPE_DIRECT); //direct类型(常用的有fanout、direct、topic、headers) 25 $ex->setFlags(AMQP_DURABLE); //持久化 26 27 28 //创建队列 29 $q = new AMQPQueue($channel); 30 $q->setName($q_name); 31 $q->setFlags(AMQP_DURABLE); //持久化 32 33 $total = $q->declareQueue();//获取所有的消息数量 34 35 //绑定交换机与队列,并指定路由键 36 $q->bind($e_name, $k_route); 37 38 //1、阻塞模式接收消息 39 while(True){ 40 $q->consume(\'processMessage\'); 41 //$q->consume(\'processMessage\', AMQP_AUTOACK); //自动ACK应答 42 } 43 44 //2 非阻塞模式接收消息 可定时调用 45 //if($total){ 46 // for($i=0;$i<$total;$i++){ 47 // $envelope = $q->get(); 48 // if($envelope){ 49 // $msg = $envelope->getBody(); 50 // echo $msg."\\n"; //处理消息 51 // $q->ack($envelope->getDeliveryTag()); //手动发送ACK应答 52 // } 53 // } 54 //} 55 56 $conn->disconnect(); 57 58 /** 59 * 消费回调函数 60 * 处理消息 61 */ 62 function processMessage($envelope, $queue) { 63 $msg = $envelope->getBody(); 64 echo $msg."\\n"; //处理消息 65 $queue->ack($envelope->getDeliveryTag()); //手动发送ACK应答 66 } 67 ?>
【生产消息】
1 <?php 2 //配置信息 3 $conn_args = array( 4 \'host\' => \'127.0.0.1\', 5 \'port\' => \'5672\', 6 \'login\' => \'zcw\', 7 \'password\' => \'123456\', 8 \'vhost\'=>\'/\' 9 ); 10 $e_name = \'exchange1\'; //交换机名 11 //$q_name = \'queue1\'; //无需队列名 12 $k_route = \'route1\'; //路由key 13 14 //创建连接和channel 15 $conn = new AMQPConnection($conn_args); 16 if (!$conn->connect()) { 17 die("Cannot connect to the broker!\\n"); 18 } 19 $channel = new AMQPChannel($conn); 20 //创建交换机对象 21 $ex = new AMQPExchange($channel); 22 $ex->setName($e_name); 23 //发送消息 24 //$channel->startTransaction(); //开始事务 25 for($i=0; $i<5; ++$i){ 26 $ex->publish($message, $k_route)."\\n"; 27 } 28 29 //$channel->commitTransaction(); //提交事务 30 31 $conn->disconnect(); 32 33 ?>
以上是关于PHP使用RabbitMQ消息队列的主要内容,如果未能解决你的问题,请参考以下文章