封装rabbitmq

Posted allen-spot

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了封装rabbitmq相关的知识,希望对你有一定的参考价值。

今天又抽时间用php封装了rabbitmq,使用的框架是yaf

Consumer如下:

<?php

namespace RabbitMq;
class Consumer

    public $exchange_name = "exchange_1";
    public $route_name = "route_1";
    public $queue_name = "queue_1";
    public $conn = null;
    public $channel = null;
    public $exchange = null;
    public $queue = null;

    public function __construct(string $exchange_name = "", string $route_name = "", string $queue_name = "")
    
        if ($exchange_name) $this->exchange_name = $exchange_name;
        if ($route_name) $this->route_name = $route_name;
        if ($queue_name) $this->queue_name = $queue_name;
        $this->init();
//        $this->createChannel();
//        $this->createQueue();
    

    public function init()
    
        //创建连接和channel
        $this->conn = new \AMQPConnection(MqConfig::$config);
        if (!$this->conn->connect()) 
            die("Cannot connect to the broker!\n");
        

    

    public function createChannel()
    
        $this->channel = new \AMQPChannel($this->conn);

        //创建交换机
        $this->exchange = new \AMQPExchange($this->channel);
        $this->exchange->setName($this->exchange_name);
        $this->exchange->setType(AMQP_EX_TYPE_DIRECT); //direct类型
        $this->exchange->setFlags(AMQP_DURABLE); //持久化
        //echo "Exchange Status:" . $this->exchange->declare() . "\n";


    

    public function createQueue()
    
        //创建队列
        $this->queue = new \AMQPQueue($this->channel);
        $this->queue->setName($this->queue_name);
        $this->queue->setFlags(AMQP_DURABLE); //持久化
        //echo "Message Total:" . $this->queue->declare() . "\n";
        //绑定交换机与队列,并指定路由键
        echo ‘Queue Bind: ‘ . $this->queue->bind($this->exchange_name, $this->route_name) . "\n";

        //阻塞模式接收消息
        echo "接收到的消息:\n";
        while (True) 
            $this->queue->consume(function ($envelope, $queue) 
                $msg = $envelope->getBody();
                echo $msg . "\n"; //处理消息
                $queue->ack($envelope->getDeliveryTag()); //手动发送ACK应答
            );
            //$q->consume(‘processMessage‘, AMQP_AUTOACK); //自动ACK应答
        
        $this->conn->disconnect();
    
Publisher如下
<?php

namespace RabbitMq;
class Publisher

    public $exchange_name = "exchange_1";
    public $route_name = "route_1";
    public $conn = null;
    public $channel = null;
    public $exchange = null;

    public function __construct(string $exchange_name = "", string $route_name = "")
    
        if ($exchange_name) $this->exchange_name = $exchange_name;
        if ($route_name) $this->route_name = $route_name;
        $this->init();
    

    public function init()
    
        //创建连接和channel
        $this->conn = new \AMQPConnection(MqConfig::$config);
        if (!$this->conn->connect()) 
            die("Cannot connect to the broker!\n");
        

    

    public function createChannel()
    
        $this->channel = new \AMQPChannel($this->conn);
        //创建交换机对象
        $this->exchange = new \AMQPExchange($this->channel);
        $this->exchange->setName($this->exchange_name);
    

    public function publishMsg()
    
        for ($i = 0; $i < 5; ++$i) 
            sleep(1);//休眠1秒
            //发送的消息内容
            $message = "测试消息,你好啊!" . date("h:i:s");
            echo "发送消息:哈哈哈:" . $this->exchange->publish($message, $this->route_name) . "\n";
        
        $this->conn->disconnect();
    

简单调用:

调用consumer:

$consumer = new \RabbitMq\Consumer();
$consumer->createChannel();
$consumer->createQueue();

调用Publisher:

$publisher = new \RabbitMq\Publisher();
$publisher->createChannel();
$publisher->publishMsg();

 

以上是关于封装rabbitmq的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMQRabbitMQ和Erlang下载与安装步骤—2023超详细最新版

Springboot 整合 RabbitMQrabbitmq介绍:安装,下载,创建队列交换机,5种工作模式

rabbitmq简介

RabbitMQ

RabbitMQ安装

day11