Hyperf使用RabbitMQ消息队列
Posted 骷大人
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Hyperf使用RabbitMQ消息队列相关的知识,希望对你有一定的参考价值。
Hyperf连接使用RabbitMQ消息中间件
传送门
部署环境
安装amqp扩展
composer require hyperf/amqp
安装command命令行扩展
composer require hyperf/command
配置参数
假设已经在rabbitmq设置了交换机exchange_test和队列queue_test
新建 /config/autoload/amp.php配置文件,修改地址和用户名密码
<?php
return [
'default' => [
'host' => '127.0.0.1',//rabbitmq服务的地址
'port' => 5672,
'user' => 'user',
'password' => '123456',
'vhost' => '/',
'concurrent' => [
'limit' => 1,
],
'pool' => [
'connections' => 1,
],
'params' => [
'insist' => false,
'login_method' => 'AMQPLAIN',
'login_response' => null,
'locale' => 'en_US',
'connection_timeout' => 3.0,
'read_write_timeout' => 6.0,
'context' => null,
'keepalive' => false,
'heartbeat' => 3,
'close_on_destruct' => false,
],
],
'pool2' => [
...
]
];
生产数据
创建生产者中间件
php bin/hyperf.php gen:amqp-producer DemoProducer
exchange是交换机,routingKey是队列名
<?php
declare(strict_types=1);
namespace App\\Amqp\\Producers;
use Hyperf\\Amqp\\Annotation\\Producer;
use Hyperf\\Amqp\\Message\\ProducerMessage;
#[Producer(exchange: "exchange_test", routingKey: "queue_test")]
class DemoProducer extends ProducerMessage
public function __construct($data)
//将收到的数据加入队列
$this->plyload = $data;
创建生产者脚本
php bin/hyperf.php gen:command FooCommand
代码
<?php
declare(strict_types=1);
namespace App\\Command;
use Hyperf\\Command\\Command as HyperfCommand;
use Hyperf\\Command\\Annotation\\Command;
use Hyperf\\Amqp\\Producer;
use App\\Amqp\\Producers\\DemoProducer;
use Hyperf\\Utils\\ApplicationContext;
/**
* @Command
*/
class FooCommand extends HyperfCommand
/**
* 执行的命令行
*
* @var string
*/
protected $name = 'foo:command';
public function handle()
//协程代码,创建1000个协程分别处理
$wg = new \\Hyperf\\Utils\\WaitGroup();
$wg->add(1000);// 计数器加1000
for($i=0;$i<1000;$i++)
// 创建协程$i
co(function () use ($wg)
//amqp代码,将数据加入生产者队列
$message = new DemoProducer(['id'=>$i]);
$producer = ApplicationContext::getContainer()->get(Producer::class);
$result = $producer->produce($message);
// 计数器减一
$wg->done();
);
// 等待所有协程运行完成
$wg->wait();
调用命令行,来生产数据
php bin/hyperf.php foo:command
至此,进入rabbitmq后台,对应的队列里就会有数据。
消费数据
创建消费者中间件
php bin/hyperf.php gen:amqp-consumer DemoConsumer
代码解释如上,多的queue也是队列名,num是进程数
<?php
declare(strict_types=1);
namespace App\\Amqp\\Consumers;
use Hyperf\\Amqp\\Annotation\\Consumer;
use Hyperf\\Amqp\\Message\\ConsumerMessage;
use Hyperf\\Amqp\\Result;
use PhpAmqpLib\\Message\\AMQPMessage;
#[Consumer(exchange: "hyperf", routingKey: "hyperf", queue: "hyperf", nums: 1)]
class DemoConsumer extends ConsumerMessage
public function consumeMessage($data, AMQPMessage $message): string
print_r($data);
return Result::ACK;
重启框架会自动调用消费者
php bin/hyperf.php start
原创码字不易,喜欢请收藏关注
部分参考自:https://www.bilibili.com/video/BV1de4y1E7Ya/?vd_source=36102b089bcd7ff8177499ba833633e0
以上是关于Hyperf使用RabbitMQ消息队列的主要内容,如果未能解决你的问题,请参考以下文章
Hyperf+RabbitMQ+WebSocket实现大屏幕消息推送