thinkphp使用AMQP库(支持RabbitMq)
Posted 八戒vs
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了thinkphp使用AMQP库(支持RabbitMq)相关的知识,希望对你有一定的参考价值。
1,安装依赖库
composer require php-amqplib/php-amqplib
地址:https://github.com/php-amqplib/php-amqplib
2,mq生产者.php
include(__DIR__ . ‘../../public/config.php‘);
use PhpAmqpLibConnectionAMQPStreamConnection;
use PhpAmqpLibMessageAMQPMessage;
/**
* Created by PhpStorm.
* User: pandeng
* Date: 2017-07-26
* Time: 21:51
*/
class MessageQueue
{
const exchange = ‘router‘;
const queue = ‘msgs‘;
public static function pushMessage($data)
{
$connection = new AMQPStreamConnection(HOST, PORT, USER, PASS, VHOST);
$channel = $connection->channel();
$channel->queue_declare(self::queue, false, true, false, false);
$channel->exchange_declare(self::exchange, ‘direct‘, false, true, false);
$channel->queue_bind(self::queue, self::exchange);
$messageBody = $data;
$message = new AMQPMessage($messageBody, array(‘content_type‘ => ‘text/plain‘, ‘delivery_mode‘ => AMQPMessage::DELIVERY_MODE_PERSISTENT));
$channel->basic_publish($message, self::exchange);
$channel->close();
$connection->close();
return "ok";
}
}
3.消费者.php
namespace appindexcontroller;
include(__DIR__ . ‘../../../../public/config.php‘);
use PhpAmqpLibConnectionAMQPStreamConnection;
use thinkController;
use thinkLog;
use thinkRequest;
use thinkDb;
class MessageConsume extends Controller
{
const exchange = ‘router‘;
const queue = ‘msgs‘;
const consumerTag = ‘consumer‘;
function shutdown($channel, $connection)
{
$channel->close();
$connection->close();
write_log("closed",3);
}
function process_message($message)
{
if ($message->body !== ‘quit‘) {
$obj = json_decode($message->body);
if (!isset($obj->id)) {
echo ‘error data
‘;
write_log("error data:" . $message->body, 2);
} else {
try {
write_log("data:" . json_encode($message));
} catch (ThinkException $e) {
write_log($e->getMessage(), 2);
write_log(json_encode($message), 2);
} catch (PDOException $pe) {
write_log($pe->getMessage(), 2);
write_log(json_encode($message), 2);
}
}
}
$message->delivery_info[‘channel‘]->basic_ack($message->delivery_info[‘delivery_tag‘]);
// Send a message with the string "quit" to cancel the consumer.
if ($message->body === ‘quit‘) {
$message->delivery_info[‘channel‘]->basic_cancel($message->delivery_info[‘consumer_tag‘]);
}
}
/**
* 启动
*
* @return hinkResponse
*/
public function start()
{
$connection = new AMQPStreamConnection(HOST, PORT, USER, PASS, VHOST);
$channel = $connection->channel();
$channel->queue_declare(self::queue, false, true, false, false);
$channel->exchange_declare(self::exchange, ‘direct‘, false, true, false);
$channel->queue_bind(self::queue, self::exchange);
$channel->basic_consume(self::queue, self::consumerTag, false, false, false, false, array($this, ‘process_message‘));
register_shutdown_function(array($this, ‘shutdown‘), $channel, $connection);
while (count($channel->callbacks)) {
$channel->wait();
}
write_log("starting",3);
}
}
- 启动消费者(守护进程)
nohup php index.php index/Message_Consume/start &
原文链接:https://www.jianshu.com/p/89dc541c6362
以上是关于thinkphp使用AMQP库(支持RabbitMq)的主要内容,如果未能解决你的问题,请参考以下文章