封装rabbitmq
Posted allen-spot
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了封装rabbitmq相关的知识,希望对你有一定的参考价值。
今天又抽时间用php封装了rabbitmq,使用的框架是yaf
Consumer如下:
<?php namespace RabbitMq; class Consumer public $exchange_name = "exchange_1"; public $route_name = "route_1"; public $queue_name = "queue_1"; public $conn = null; public $channel = null; public $exchange = null; public $queue = null; public function __construct(string $exchange_name = "", string $route_name = "", string $queue_name = "") if ($exchange_name) $this->exchange_name = $exchange_name; if ($route_name) $this->route_name = $route_name; if ($queue_name) $this->queue_name = $queue_name; $this->init(); // $this->createChannel(); // $this->createQueue(); public function init() //创建连接和channel $this->conn = new \AMQPConnection(MqConfig::$config); if (!$this->conn->connect()) die("Cannot connect to the broker!\n"); public function createChannel() $this->channel = new \AMQPChannel($this->conn); //创建交换机 $this->exchange = new \AMQPExchange($this->channel); $this->exchange->setName($this->exchange_name); $this->exchange->setType(AMQP_EX_TYPE_DIRECT); //direct类型 $this->exchange->setFlags(AMQP_DURABLE); //持久化 //echo "Exchange Status:" . $this->exchange->declare() . "\n"; public function createQueue() //创建队列 $this->queue = new \AMQPQueue($this->channel); $this->queue->setName($this->queue_name); $this->queue->setFlags(AMQP_DURABLE); //持久化 //echo "Message Total:" . $this->queue->declare() . "\n"; //绑定交换机与队列,并指定路由键 echo ‘Queue Bind: ‘ . $this->queue->bind($this->exchange_name, $this->route_name) . "\n"; //阻塞模式接收消息 echo "接收到的消息:\n"; while (True) $this->queue->consume(function ($envelope, $queue) $msg = $envelope->getBody(); echo $msg . "\n"; //处理消息 $queue->ack($envelope->getDeliveryTag()); //手动发送ACK应答 ); //$q->consume(‘processMessage‘, AMQP_AUTOACK); //自动ACK应答 $this->conn->disconnect();
Publisher如下
<?php namespace RabbitMq; class Publisher public $exchange_name = "exchange_1"; public $route_name = "route_1"; public $conn = null; public $channel = null; public $exchange = null; public function __construct(string $exchange_name = "", string $route_name = "") if ($exchange_name) $this->exchange_name = $exchange_name; if ($route_name) $this->route_name = $route_name; $this->init(); public function init() //创建连接和channel $this->conn = new \AMQPConnection(MqConfig::$config); if (!$this->conn->connect()) die("Cannot connect to the broker!\n"); public function createChannel() $this->channel = new \AMQPChannel($this->conn); //创建交换机对象 $this->exchange = new \AMQPExchange($this->channel); $this->exchange->setName($this->exchange_name); public function publishMsg() for ($i = 0; $i < 5; ++$i) sleep(1);//休眠1秒 //发送的消息内容 $message = "测试消息,你好啊!" . date("h:i:s"); echo "发送消息:哈哈哈:" . $this->exchange->publish($message, $this->route_name) . "\n"; $this->conn->disconnect();
简单调用:
调用consumer:
$consumer = new \RabbitMq\Consumer(); $consumer->createChannel(); $consumer->createQueue();
调用Publisher:
$publisher = new \RabbitMq\Publisher(); $publisher->createChannel(); $publisher->publishMsg();
以上是关于封装rabbitmq的主要内容,如果未能解决你的问题,请参考以下文章
RabbitMQRabbitMQ和Erlang下载与安装步骤—2023超详细最新版