#yyds干货盘点#使用php-amqplib实现RabbitMq

Posted OwenZhang24

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了#yyds干货盘点#使用php-amqplib实现RabbitMq相关的知识,希望对你有一定的参考价值。

> 本文环境 Ubuntu 20.04,nginx1.8,php7.3,RabbitMq3.9\\

> 不懂的可以评论或联系我邮箱:owen@owenzhang.com\\

> 著作权归OwenZhang所有。商业转载请联系OwenZhang获得授权,非商业转载请注明出处。

项目代码

​https://gitee.com/owenzhang24/tp5​

队列笔记

1: 列出队列(Listing queues)

如果你想查看Rabbitmq队列,并且想知道有多少消息存在其中,你(作为特权用户)可以使用rabbitmqctl 工具:

rabbitmqctl list_queues。

在Windows中,省略sudo:

rabbitmqctl.bat list_queues

2: 工作队列

我们发现即使使用CTRL+C杀掉了一个工作者(worker)进程,消息也不会丢失。当工作者(worker)挂掉这后,所有没有响应的消息都会重新发送。

一个很容易犯的错误就是忘了basic_ack,后果很严重。消息在你的程序退出之后就会重新发送,如果它不能够释放没响应的消息,RabbitMQ就会占用越来越多的内存。

为了排除这种错误,你可以使用rabbitmqctl命令,输出messages_unacknowledged字段:

$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged

在window系统运行,去掉sudo:

$ rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged

3: rabbitmqctl能够列出服务器上所有的交换器:

$ sudo rabbitmqctl list_exchanges

这个列表中有一些叫做amq.*的交换器。这些都是默认创建的,不过这时候你还不需要使用他们。

4:列出所有现存的绑定

rabbitmqctl list_bindings

5: 如果你想把日志保存到文件中,只需要打开控制台输入: ( ​​receive_logs.php​​ 源代码)

$ php receive_logs.php > logs_from_rabbit.log

如果你希望所有的日志信息都输出到屏幕中,打开一个新的终端,然后输入:

$ php receive_logs_direct.php info warning error
# => [*] Waiting for logs. To exit press CTRL+C

如果要触发一个error级别的日志,只需要输入:

$ php emit_log_direct.php error "Run. Run. Or it will explode."
# => [x] Sent error:Run. Run. Or it will explode.

队列安装

第一:安装RabbitMq环境

​windows环境的rabbitmq安装与启动​

​https://my.oschina.net/owenzhang24/blog/5051652​

第二:

composer require php-amqplib/php-amqplib

第三:代码类

  1. rabbitMq实现的基础类:application/common/lib/classes/rabbitmq/RabbitMq.php
  2. 供外部调用的rabbitMq类:application/common/lib/classes/RabbitMqWork.php
  3. 测试发送消息到rabbitMq中的方法:application/index/controller/Index.php
  4. 添加php think命令实现接收rabbitMq中的消息:application/common/command/*.php

第四:使用说明

  1. 发送消息时直接在自己的方法中调用RabbitMqWork.php类中的几个送消息的方法即可。
  2. application/common/command/下的类都是实现添加php think命令的类,在configure方法中的setName()中设置命令名称,execute()方法是为了执行接收rabbitMq中的消息,同时在application/command.php中return添加设置的命令名称及对应的命令目录地址。
  3. 贡献文档
  4. ​RabbitMQ 中文文档-PHP版​​​。​​https://xiaoxiami.gitbook.io/rabbitmq_into_chinese_php/​
  5. ​RabbitMQ官方文档​​​。​​https://www.rabbitmq.com/getstarted.html​

第五:源码

application/common/lib/classes/rabbitmq/RabbitMq.php

<?php
//rabbitMq实现的基础类

namespace app\\common\\lib\\classes\\rabbitmq;

use PhpAmqpLib\\Connection\\AMQPStreamConnection;
use PhpAmqpLib\\Message\\AMQPMessage;

class RabbitMq

static private $instance;
static private $connection;
static private $channel;
const DIRECT = direct;
const TOPIC = topic;
const HEADERS = headers;
const FANOUT = fanout;
static private $exchangeNames = [
self::DIRECT => direct_exchange,
self::TOPIC => topic_exchange,
self::HEADERS => headers_exchange,
self::FANOUT => fanout_exchange,
];
const SEVERITYS = [
info,
warning,
error
];
static private $exchangeName = ;

/**
* RabbitMq constructor.
* @param $exchangeType
*/
private function __construct($exchangeType)

self::$connection = new AMQPStreamConnection(localhost, 5672, guest, guest);
self::$channel = self::$connection->channel();
if (!empty($exchangeType))
self::$exchangeName = self::$exchangeNames[$exchangeType];
self::$channel->exchange_declare(
self::$exchangeName, //交换机名称
$exchangeType, //路由类型
false, //dont check if a queue with the same name exists 是否检测同名队列
true, //the queue will not survive server restarts 是否开启队列持久化
false //the queue will be deleted once the channel is closed. 通道关闭后是否删除队列
);



/**
* 实例化
* @param string $exchangeType
* @return RabbitMq
*/
public static function instance($exchangeType = )

if (!self::$instance instanceof self)
self::$instance = new self($exchangeType);

return self::$instance;


/**
* 防止被外部复制
*/
private function __clone()



/**
* 简单的发送
*/
public function send()

self::$channel->queue_declare(hello, false, false, false);
$msg = new AMQPMessage(Hello World!);
self::$channel->basic_publish($msg, , hello);
echo "[X] Sent Hello World!\\n";


/**
* 简单的接收
* @param $queueName
* @param $callback
*/
public function receive($callback)

self::$channel->queue_declare(hello, false, false, false, true);
echo "[*] Waiting for messages. To exit press CTRL+C\\n";

self::$channel->basic_consume(hello, , false, true, false, false, $callback);

while (count(self::$channel->callbacks))
self::$channel->wait();



/**
* 添加工作队列
* @param string $data
*/
public function addTask($data = )

self::$channel->queue_declare(task_queue, false, true, false, true);
if (empty($data)) $data = Hello World!;
$msg = new AMQPMessage(
$data,
array(delivery_mode => AMQPMessage::DELIVERY_MODE_PERSISTENT)
);
self::$channel->basic_publish($msg, , task_queue);

echo "[x] Sent $data \\n";


/**
* 执行工作队列
* @param $callback
*/
public function workTask($callback)

self::$channel->queue_declare(task_queue, false, true, false, true);
echo [*] Waiting for messages. To exit press CTRL+C, "\\n";

self::$channel->basic_qos(null, 1, null);
self::$channel->basic_consume(task_queue, , false, false, false, false, $callback);

while (count(self::$channel->callbacks))
self::$channel->wait();



/**
* 发布
* @param string $data
*/
public function sendQueue($data = )

if (empty($data)) $data = info:Hello World!;
$msg = new AMQPMessage($data);
self::$channel->basic_publish($msg, self::$exchangeName);
echo "[x] Sent $data \\n";


/**
* 订阅
* @param $callback
*/
public function subscribeQueue($callback)

list($queue_name, ,) = self::$channel->queue_declare(
"", //队列名称
false, //dont check if a queue with the same name exists 是否检测同名队列
true, //the queue will not survive server restarts 是否开启队列持久化
true, //the queue might be accessed by other channels 队列是否可以被其他队列访问
false //the queue will be deleted once the channel is closed. 通道关闭后是否删除队列
);
self::$channel->queue_bind($queue_name, self::$exchangeName);
echo "[*] Waiting for logs. To exit press CTRL+C \\n";
self::$channel->basic_consume($queue_name, , false, true, false, false, $callback);

while (count(self::$channel->callbacks))
self::$channel->wait();



/**
* 发送(直接交换机)
* @param $routingKey
* @param string $data
*/
public function sendDirect($routingKey, $data = )

if (empty($data)) $data = "Hello World!";
$msg = new AMQPMessage($data);
self::$channel->basic_publish($msg, self::$exchangeName, $routingKey);
echo "[x] Sent $routingKey:$data \\n";


/**
* 接收(直接交换机)
* @param \\Closure $callback
* @param array $bindingKeys
*/
public function receiveDirect(\\Closure $callback, array $bindingKeys)

list($queue_namme, ,) = self::$channel->queue_declare(, false, true, true, false);
foreach ($bindingKeys as $bindingKey)
self::$channel->queue_bind($queue_namme, self::$exchangeName, $bindingKey);

echo "[x] Waiting for logs. To exit press CTRL+C \\n";
self::$channel->basic_consume($queue_namme, , false, true, false, false, $callback);
while (count(self::$channel->callbacks))
self::$channel->wait();



/**
* 发送(主题交换机)
* @param $routingKey
* @param string $data
*/
public function sendTopic($routingKey, $data = )

if (empty($data)) $data = "Hello World!";
$msg = new AMQPMessage($data);
self::$channel->basic_publish($msg, self::$exchangeName, $routingKey);
echo " [x] Sent ", $routingKey, :, $data, " \\n";


/**
* 接收(主题交换机)
* @param \\Closure $callback
* @param array $bindingKeys
*/
public function receiveTopic(\\Closure $callback, array $bindingKeys)

list($queueName, ,) = self::$channel->queue_declare("", false, true, true, false);
foreach ($bindingKeys as $bindingKey)
self::$channel->queue_bind($queueName, self::$exchangeName, $bindingKey);


echo [*] Waiting for logs. To exit press CTRL+C, "\\n";
self::$channel->basic_consume($queueName, , false, true, false, false, $callback);

while (count(self::$channel->callbacks))
self::$channel->wait();



/**
* 销毁
*/
public function __destruct()

// TODO: Implement __destruct() method.
self::$channel->close();
self::$connection->close();


application/common/lib/classes/RabbitMqWork.php

<?php
//供外部调用的rabbitMq类

namespace app\\common\\lib\\classes;

use app\\common\\lib\\classes\\rabbitmq\\RabbitMq;

class RabbitMqWork

private $RabbitMq;

public function __construct($exchageType = )

$this->RabbitMq = RabbitMq::instance($exchageType);


/**
* 发送(普通)
*/
public function send()

$this->RabbitMq->send();


/**
* 接收(普通)
* @param $callback
*/
public function receive($callback)

$this->RabbitMq->receive($callback);


/**
* 发送(工作队列)
* @param $data
*/
public function addTask($data)

$this->RabbitMq->addTask($data);


/**
* 接收(工作队列)
* @param $callback
*/
public function workTask($callback)

$this->RabbitMq->workTask($callback);


/**
* 发布(扇形交换机)
* @param $data
*/
public function sendQueue($data)

$this->RabbitMq->sendQueue($data);


/**
* 订阅(扇形交换机)
* @param $callback
*/
public function subscribeQueue($callback)

$this->RabbitMq->subscribeQueue($callback);


/**
* 发送(直接交换机)
* @param $bindingKey
* @param $data
*/
public function sendDirect($routingKey, $data)

$this->RabbitMq->sendDirect($routingKey, $data);


/**
* 接收(直接交换机)
* @param \\Closure $callback
* @param array $bindingKeys
*/
public function receiveDirect(\\Closure $callback, array $bindingKeys)

$this->RabbitMq->receiveDirect($callback, $bindingKeys);


/**
* 发送(主题交换机)
* @param $routingKey
* @param $data
*/
public function sendTopic($routingKey, $data)

$this->RabbitMq->sendTopic($routingKey, $data);


/**
* 接收(主题交换机)
* @param \\Closure $callback
* @param array $bindingKeys
*/
public function receiveTopic(\\Closure $callback, array $bindingKeys)

$this->RabbitMq->receiveTopic($callback, $bindingKeys);


application/index/controller/Index.php

<?php

namespace app\\index\\controller;

use app\\common\\lib\\classes\\rabbitmq\\RabbitMq;
use app\\common\\lib\\classes\\RabbitMqWork;
use app\\polymerize\\tool\\module\\es\\SearchBlog;
use app\\polymerize\\tool\\module\\es\\SyncBlog;
use think\\Collection;

class Index extends Collection

public function index()

// $this->send();
// $this->addTask();
// $this->sendQueue();
// $this->sendDirect();
$this->sendTopic();
var_dump(11);
die();

public function searchBlog()

// $id=1;
// $res = SyncBlog::getInstance()->syncBlog($id);
$search=11;
$res = SearchBlog::getInstance()->searchBlog($search, 1, 100);
var_dump($res);
die();
var_dump(1111);
die();


/**
* 发送(普通)
*/
public function send()

$RabbitMqWork = new RabbitMqWork();
$RabbitMqWork->send();


/**
* 发送(工作队列)
*/
public function addTask()

$data = input(data, This is work task!);
$RabbitMqWork = new RabbitMqWork();
$RabbitMqWork->addTask($data);


/**
* 发送(扇形交换机)
*/
public function sendQueue()

$data = input(data, This is send queue1);
$RabbitMqWork = new RabbitMqWork(RabbitMq::FANOUT);
$RabbitMqWork->sendQueue($data);


/**
* 发送(直接交换机)
*/
public function sendDirect()

$data = input(data, Hello World!);
$routingKey = input(routingKey, info);
$RabbitMqWork = new RabbitMqWork(RabbitMq::DIRECT);
$RabbitMqWork->sendDirect($routingKey, $data);


/**
* 发送(主题交换机)
*/
public function sendTopic()

$data = input(data, Hello World!);
$routingKey = input(routingKey, lazy.boy);
$RabbitMqWork = new RabbitMqWork(RabbitMq::TOPIC);
$RabbitMqWork->sendTopic($routingKey, $data);

application/command.php

<?php
// +----------------------------------------------------------------------
// | ThinkPHP [ WE CAN DO IT JUST THINK ]
// +----------------------------------------------------------------------
// | Copyright (c) 2006-2016 http://thinkphp.cn All rights reserved.
// +----------------------------------------------------------------------
// | Licensed ( http://www.apache.org/licenses/LICENSE-2.0 )
// +----------------------------------------------------------------------
// | Author: yunwuxin <448901948@qq.com>
// +----------------------------------------------------------------------

return [
simpleMq => application\\command\\SimpleWork,
workQueue => application\\command\\WorkQueue,
sendQueue => application\\command\\SendQueue,
directQueue => application\\command\\DirectQueue,
topicQueue => application\\command\\TopicQueue,
];

application/common/command/*.php

application/command/DirectQueue.php

<?php
/**
* 接收(直接交换机)
* @param \\Closure $callback
* @param array $bindingKeys
*/

namespace app\\command;

use app\\common\\lib\\classes\\rabbitmq\\RabbitMq;
use app\\common\\lib\\classes\\RabbitMqWork;
use think\\console\\Command;
use think\\console\\Input;
use think\\console\\Output;

class DirectQueue extends Command

protected function configure()

parent::configure(); // TODO: Change the autogenerated stub
$this->setName(directQueue);


protected function execute(Input $input, Output $output)

$RabbitMqWork = new RabbitMqWork(RabbitMq::DIRECT);
$callback = function ($msg)
echo "[x] ".$msg->delivery_info[routing_key].":$msg->body \\n";
;
$RabbitMqWork->receiveDirect($callback,RabbitMq::SEVERITYS);

application/command/SendQueue.php

<?php
/**
* 订阅(扇形交换机)
* @param $callback
*/

namespace app\\command;

use app\\common\\lib\\classes\\rabbitmq\\RabbitMq;
use app\\common\\lib\\classes\\RabbitMqWork;
use think\\console\\Command;
use think\\console\\Input;
use think\\console\\Output;
use think\\Log;

class SendQueue extends Command

protected function configure()

parent::configure(); // TODO: Change the autogenerated stub
$this->setName(sendQueue);


protected function execute(Input $input, Output $output)

$RabbitMqWork = new RabbitMqWork(RabbitMq::FANOUT);
$callback = function ($msg)
echo Receive:;
echo "Msg:$msg->body \\n";
\\Log::error("Msg:$msg->body");
;
$RabbitMqWork->subscribeQueue($callback);

application/command/SimpleWork.php

<?php
/**
* 接收(普通)
* @param $callback
*/

namespace app\\command;

use app\\common\\lib\\classes\\RabbitMqWork;
use think\\console\\Command;
use think\\console\\Input;
use think\\console\\Output;
use think\\Log;

class SimpleWork extends Command

protected function configure()

parent::configure(); // TODO: Change the autogenerated stub
$this->setName(simpleMq);


protected function execute(Input $input, Output $output)

$RabbitMqWork= new RabbitMqWork();
$callback = function ($msg)
echo Receive:;
$queueName = $msg->delivery_info[routing_key];
$msgData = $msg->body;
$isAck = true;
echo Msg:.$msgData."\\n";
echo QueueName:.$queueName."\\n";
if($isAck)
$msg->delivery_info[channel]->basic_ack($msg->delivery_info[delivery_tag]);

;
$RabbitMqWork->receive($callback);

application/command/TopicQueue.php

<?php
/**
* 接收(主题交换机)
* @param \\Closure $callback
* @param array $bindingKeys
*/

namespace app\\command;

use app\\common\\lib\\classes\\rabbitmq\\RabbitMq;
use app\\common\\lib\\classes\\RabbitMqWork;
use think\\console\\Command;
use think\\console\\Input;
use think\\console\\Output;

class TopicQueue extends Command

protected function configure()

parent::configure(); // TODO: Change the autogenerated stub
$this->setName(topicQueue);


protected function execute(Input $input, Output $output)

$RabbitMqWork = new RabbitMqWork(RabbitMq::TOPIC);
$callback = function ($msg)
echo [x] ,$msg->delivery_info[routing_key], :, $msg->body, "\\n";
;
$bindingKeys = [
*.orange.*,
*.*.rabbit,
lazy.#
];
$RabbitMqWork->receiveTopic($callback,$bindingKeys);

application/command/WorkQueue.php

<?php
/**
* 接收(工作队列)
* @param $callback
*/

namespace app\\command;

use app\\common\\lib\\classes\\RabbitMqWork;
use think\\console\\Command;
use think\\console\\Input;
use think\\console\\Output;

class WorkQueue extends Command

protected function configure()

parent::configure(); // TODO: Change the autogenerated stub
$this->setName(workQueue);


protected function execute(Input $input, Output $output)

$RabbitMqWork = new RabbitMqWork();
$callback = function ($msg)
echo " [x] Received ", $msg->body, "\\n";
sleep(substr_count($msg->body, .));
echo " [x] Done", "\\n";
$msg->delivery_info[channel]->basic_ack($msg->delivery_info[delivery_tag]);
;
$RabbitMqWork->workTask($callback);

Buy me a cup of coffee :)

觉得对你有帮助,就给我打赏吧,谢谢!

微信赞赏码链接,点击跳转:

​https://www.owenzhang.com/wechat_reward.png​

以上是关于#yyds干货盘点#使用php-amqplib实现RabbitMq的主要内容,如果未能解决你的问题,请参考以下文章

SpringSecurity+JWT实现前后端分离的使用#yyds干货盘点#

SpringCache #yyds干货盘点#

# yyds干货盘点 # 盘点一道使用Python编程来实现高斯计算的基础算术题目

ConcurrentHashMap底层实现#yyds干货盘点#

#yyds干货盘点# springcloud整合Sentinel使用Nacos存储规则

#yyds干货盘点# springboot整合Oauth2,GateWay实现网关登录授权验证