MQ 实现信息延迟投递
Posted Json2011315
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了MQ 实现信息延迟投递相关的知识,希望对你有一定的参考价值。
1.使用延迟插件 rabbitmq_delayed_message_exchange 实现队列信息延迟投递
安装步骤
1.1 下载插件
MQ插件地址:https://www.rabbitmq.com/community-plugins.html
下载延迟交换机插件地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/tag/v3.8.0
说明选择适应的版本:
1.2 下载文件后执行
下载完成后是一个zip / ez 包,解压缩 将ez文件拷贝到rabbitmq安装目录下的plugins目录中
1.3 执行安装
执行安装 :
root@8a5497b5da92:/plugins# rabbitmq-plugins enable rabbitmq_delayed_message_exchange
查看是否安装成功:
root@8a5497b5da92:/plugins# rabbitmq-plugins list
1.4 重启mq 查看交换机类型
2.延迟交换机优点
1.第一条消息过期 TTL 是 30min,第二条消息 TTL 是 10min,延迟交换机不会堵塞,触发过期时间的数据会被交换机优先投递到队列中
2.当梯度非常多的情况下,比如 1 分钟,2 分钟,5 分钟,10 分钟,20 分钟,30 分钟,不需要设置过多的交换机或队列
3.代码实现
3.1 消费者代码实现 php consume.php
<?php
require_once __DIR__ . '/../vendor/autoload.php';
use PhpAmqpLib\\Connection\\AMQPStreamConnection;
use PhpAmqpLib\\Wire\\AMQPTable;
try
$connection = new AMQPStreamConnection('192.168.88.130', 5672, 'guest', 'guest');
$channel = $connection->channel();
$exchange = 'order_delayed_exchange'; //交换机名
$queue_name = 'order_delayed_queue'; //消息队列载体
$exchange_type = 'x-delayed-message'; //交换机类型-延迟交换机
$routing_key = 'order_delayed_key'; //路由关键字
$durable = true; //是否持久化 true false
//第四个参数标识是否持久化【durable】
$channel->exchange_declare(
$exchange,
//exchange类型为x-delayed-message
$exchange_type,
false,
$durable,
false,
false,
false,
//此处是重点,$argument必须使用new AMQPTable()生成
new AMQPTable([
"x-delayed-type" => 'direct'
])
);
//第三个参数标识是否持久化【durable】
$channel->queue_declare($queue_name, false, $durable, false, false);
$channel->queue_bind($queue_name, $exchange, $routing_key);
echo ' [*] Waiting for logs. To exit press CTRL+C', "\\n";
$res = array();
$callback = function ($msg)
#获取推送的信息
echo ' [x] ', $msg->body, "\\n";
$result = $msg->body;
//file_put_contents('aa.log',$res,FILE_APPEND);
//信息标签
echo $msg->delivery_info['delivery_tag'] . "\\n";
echo '接收时间:' . date("Y-m-d H:i:s", time()) . PHP_EOL;
echo '接收内容:' . $result . PHP_EOL;
//消息应答机制
$result = mt_rand(0, 1); #模拟信息处理逻辑, 逻辑成功-true 逻辑处理失败-false
if (!empty($result))
#表示逻辑处理完成,并处理成功,使用ack机制应答信息(ack应答成功消息将从队列消失)
echo 'success' . PHP_EOL;
$msg->ack();
else
#表示逻辑处理失败,需要把信息重回队列,重新消费,使用nack机制
echo 'fail' . PHP_EOL;
echo "将消息打回,重回队列:";
$msg->nack(true);
// $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); //手动发送ACK应答
//exit();
;
//在默认情况下,消息确认机制是关闭的。现在是时候开启消息确认机制,将basic_consumer的第四个参数设置为false(true表示不开启消息确认),并且工作进程处理完消息后发送确认消息。
$channel->basic_consume($queue_name, '', false, false, false, false, $callback);
while (count($channel->callbacks))
$channel->wait();
$channel->close();
$connection->close();
return $res;
catch (Exception $exception)
print_r($exception->getMessage());
3.2 生产者代码 pushlist.php
<?php
require_once __DIR__ . '/../vendor/autoload.php';
//引入 php-amqplib 类库并且 使用必要的类:
use PhpAmqpLib\\Connection\\AMQPStreamConnection;
use PhpAmqpLib\\Message\\AMQPMessage;
use PhpAmqpLib\\Wire\\AMQPTable;
//创建一个 RabbitMQ 的连接:
$connection = new AMQPStreamConnection('192.168.88.130', 5672, 'guest', 'guest');
$channel = $connection->channel();
$exchange='order_delayed_exchange'; //交换机名
$exchange_type='x-delayed-message'; //交换机类型-延迟交换机
$durable = true; //标识是否持久化
$routing_key = 'order_delayed_key';//路由关键字
//声明一个队列给我们发送消息使用;然后我们就可以将消息发送到队列中;第四个参数标识是否持久化【durable】
$channel->exchange_declare(
$exchange,
//exchange类型为x-delayed-message
$exchange_type,
false,
$durable,
false,
false,
false,
//此处是重点,$argument必须使用new AMQPTable()生成
new AMQPTable([
"x-delayed-type" => 'direct'
])
);
//设为confirm模式
$channel->confirm_select();
$tls_arr=[
10000,
30000,
10000,
20000
];
$tls=$tls_arr[rand(0,3)];
#$tls=20000;【可以自行测试,先推送过期时间是6s的数据,再推送2s过期的数据】
//推送的信息内容
$data = array(
'eval_type' => 1,
'ttl'=>'过期时间'.$tls,
'content' => '提交的内容',
'add_time' => date('Y-m-d H:i:s')
);
echo $data = json_encode($data);
$msg = new AMQPMessage(
$data,
[
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
//此处是重点,设置延时时间,单位是毫秒 1s=1000ms,实例延迟20s
'application_headers' => new AMQPTable([
'x-delay' => $tls,
])
]
);
$channel->basic_publish($msg, $exchange, $routing_key);
//echo " send message :".$data." \\n";
//消息发送状态回调(成功回调)
$channel->set_ack_handler(function (AMQPMessage $message)
echo "success:" . $message->body;
);
//失败回调
$channel->set_nack_handler(function (AMQPMessage $message)
echo "fail:" . $message->body;
);
$channel->wait_for_pending_acks();
$channel->close();
$connection->close();
4.执行效果
执行生成者:先推送一条6s的信息,再推送一条2s的信息:
执行消费者:
结果:2s过期的信息会优先被消费
以上是关于MQ 实现信息延迟投递的主要内容,如果未能解决你的问题,请参考以下文章