PHP使用RabbitMQ消息队列

Posted BUG工厂

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了PHP使用RabbitMQ消息队列相关的知识,希望对你有一定的参考价值。

1、安装amqp拓展 安装流程

2、下载工具包 php-amqplib 

composer require php-amqplib/php-amqplib
 
3、代码操作如下
【消费消息】
 1 <?php 
 2 //配置信息 
 3 $conn_args = array( 
 4     \'host\' => \'127.0.0.1\',
 5     \'port\' => \'5672\',  
 6     \'login\' => \'zcw\',  
 7     \'password\' => \'123456\', 
 8     \'vhost\'=>\'/\' 
 9 );   
10 $e_name = \'exchange1\'; //交换机名 
11 $q_name = \'queue1\'; //队列名 
12 $k_route = \'route1\'; //路由key 
13  
14 //创建连接和channel 
15 $conn = new AMQPConnection($conn_args);   
16 if (!$conn->connect()) {   
17     die("Cannot connect to the broker!\\n");   
18 }   
19 $channel = new AMQPChannel($conn);   
20  
21 //创建交换机    
22 $ex = new AMQPExchange($channel);   
23 $ex->setName($e_name); 
24 $ex->setType(AMQP_EX_TYPE_DIRECT); //direct类型(常用的有fanout、direct、topic、headers)  
25 $ex->setFlags(AMQP_DURABLE); //持久化 
26 
27    
28 //创建队列    
29 $q = new AMQPQueue($channel); 
30 $q->setName($q_name);   
31 $q->setFlags(AMQP_DURABLE); //持久化  
32 
33 $total = $q->declareQueue();//获取所有的消息数量
34  
35 //绑定交换机与队列,并指定路由键 
36 $q->bind($e_name, $k_route); 
37  
38 //1、阻塞模式接收消息 
39 while(True){ 
40     $q->consume(\'processMessage\');   
41     //$q->consume(\'processMessage\', AMQP_AUTOACK); //自动ACK应答  
42 } 
43 
44 //2 非阻塞模式接收消息 可定时调用
45 //if($total){
46 //    for($i=0;$i<$total;$i++){
47 //        $envelope = $q->get();
48 //        if($envelope){
49 //             $msg = $envelope->getBody(); 
50 //             echo $msg."\\n"; //处理消息 
51 //             $q->ack($envelope->getDeliveryTag()); //手动发送ACK应答
52 //        }
53 //    }
54 //}
55 
56 $conn->disconnect();   
57  
58 /**
59  * 消费回调函数
60  * 处理消息
61  */ 
62 function processMessage($envelope, $queue) { 
63     $msg = $envelope->getBody(); 
64     echo $msg."\\n"; //处理消息 
65     $queue->ack($envelope->getDeliveryTag()); //手动发送ACK应答
66 }
67 ?>

 

【生产消息】

 1 <?php
 2 //配置信息 
 3 $conn_args = array( 
 4     \'host\' => \'127.0.0.1\',  
 5     \'port\' => \'5672\',  
 6     \'login\' => \'zcw\',
 7     \'password\' => \'123456\',
 8     \'vhost\'=>\'/\' 
 9 );   
10 $e_name = \'exchange1\'; //交换机名 
11 //$q_name = \'queue1\'; //无需队列名 
12 $k_route = \'route1\'; //路由key
13  
14 //创建连接和channel 
15 $conn = new AMQPConnection($conn_args);   
16 if (!$conn->connect()) {   
17     die("Cannot connect to the broker!\\n");   
18 }   
19 $channel = new AMQPChannel($conn);   
20 //创建交换机对象    
21 $ex = new AMQPExchange($channel);   
22 $ex->setName($e_name);   
23 //发送消息 
24 //$channel->startTransaction(); //开始事务  
25 for($i=0; $i<5; ++$i){
26    $ex->publish($message, $k_route)."\\n";
27 }
28  
29 //$channel->commitTransaction(); //提交事务 
30  
31 $conn->disconnect();
32 
33 ?>

 

 

以上是关于PHP使用RabbitMQ消息队列的主要内容,如果未能解决你的问题,请参考以下文章

PHP使用RabbitMQ消息队列

rabbitmq - 不会获取队列中的所有消息

rabbitMQ+php

RabbitMQ+PHP演示实例

求助,rabbitmq读取消息队列的问题

基于PHP使用rabbitmq实现消息队列