通信过程
假设P1和C1注册了相同的Broker,Exchange和Queue。P1发送的消息最终会被C1消费。基本的通信流程大概如下所示:
- P1生产消息,发送给服务器端的Exchange
- Exchange收到消息,根据ROUTINKEY,将消息转发给匹配的Queue1
- Queue1收到消息,将消息发送给订阅者C1
- C1收到消息,发送ACK给队列确认收到消息
- Queue1收到ACK,删除队列中缓存的此条消息
Consumer收到消息时需要显式的向rabbit broker发送basic.ack消息或者consumer订阅消息时设置auto_ack参数为true。在通信过程中,队列对ACK的处理有以下几种情况:
- 如果consumer接收了消息,发送ack,rabbitmq会删除队列中这个消息,发送另一条消息给consumer。
- 如果cosumer接受了消息, 但在发送ack之前断开连接,rabbitmq会认为这条消息没有被deliver,在consumer在次连接的时候,这条消息会被redeliver。
- 如果consumer接受了消息,但是程序中有bug,忘记了ack,rabbitmq不会重复发送消息。
- rabbitmq2.0.0和之后的版本支持consumer reject某条(类)消息,可以通过设置requeue参数中的reject为true达到目地,那么rabbitmq将会把消息发送给下一个注册的consumer。
php 生产者示例
先用 composer 加载 mq 拓展文件
{ "require": { "php-amqplib/php-amqplib": "2.7.*" //增加这行 } }
<?php require ‘vendor/autoload.php‘; use PhpAmqpLibConnectionAMQPStreamConnection; use PhpAmqpLibMessageAMQPMessage; $conf = [ ‘host‘ => ‘127.0.0.1‘, ‘port‘ => 5672, ‘user‘ => ‘kd_dev‘, ‘pwd‘ => ‘kd_dev‘, ‘vhost‘ => ‘/‘, ]; $exchangeName = ‘kd_sms_send_ex‘; //交换机名 $queueName = ‘kd_sms_send_q‘; //队列名称 $routingKey = ‘sms_send‘; //路由关键字(也可以省略) $conn = new AMQPStreamConnection( //建立生产者与mq之间的连接 $conf[‘host‘], $conf[‘port‘], $conf[‘user‘], $conf[‘pwd‘], $conf[‘vhost‘] ); $channel = $conn->channel(); //在已连接基础上建立生产者与mq之间的通道 $channel->exchange_declare($exchangeName, ‘direct‘, false, true, false); //声明初始化交换机 $channel->queue_declare($queueName, false, true, false, false); //声明初始化一条队列 $channel->queue_bind($queueName, $exchangeName, $routingKey); //将队列与某个交换机进行绑定,并使用路由关键字 $msgBody = json_encode(["name" => "iGoo", "age" => 22]); $msg = new AMQPMessage($msgBody, [‘content_type‘ => ‘text/plain‘, ‘delivery_mode‘ => 2]); //生成消息 $r = $channel->basic_publish($msg, $exchangeName, $routingKey); //推送消息到某个交换机 $channel->close(); $conn->close();
php 消费者代码示例
<?php
$bindingkey=‘sms_send‘;
//连接RabbitMQ $conn_args = array( ‘host‘=>‘127.0.0.1‘ , ‘port‘=> ‘5672‘, ‘login‘=>‘kd_dev‘ , ‘password‘=> ‘kd_dev‘,‘vhost‘ =>‘/‘); $conn = new AMQPConnection($conn_args); $conn->connect();
//设置queue名称,使用exchange,绑定routingkey $channel = new AMQPChannel($conn); // 声明一个通道 $q = new AMQPQueue($channel); // 声明一个队列 $q->setName(‘kd_sms_send_q‘); // 路由名
$q->setFlags(AMQP_DURABLE);
$q->declare();
$q->bind(‘kd_sms_send_ex‘,$bindingkey); // 队列绑定交换机
//消息获取 $messages = $q->get(AMQP_AUTOACK) ; if ($messages){ var_dump(json_decode($messages->getBody(), true )); }
$conn->disconnect();