RabbitMQ消息队列-通过fanout模式将消息推送到多个Queue中

Posted 乎合

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ消息队列-通过fanout模式将消息推送到多个Queue中相关的知识,希望对你有一定的参考价值。

介绍如何使用fanout模式将消息推送到多个队列。
有时我们会遇到这样的情况,多个功能模块都希望得到完整的消息数据。例如一个log的消息,一个我们希望输出在屏幕上实时监控,另外一个用户持久化日志。这时就可以使用fanout模式。fanout模式模式不像direct模式通过routingkey来进行匹配,而是会把消息发送到所以的已经绑定的队列中。

 可以看到ca和cb收到的消息完全一致。注意以上代码fanout.php中并没有新建队列,所以先运行fanout_ca.php和fanout_cb.php的脚本,如果先运行fanout.php因为找不到绑定的队列数据就会丢失。 

 

<?php

/*
* RabbitMQ fanout
* create by superid
* 新建fanout.php用来发布消息。fanout_ca.php 和 fanout_cb.php 用来订阅不同队列消费消息。
*/

$exchangeName = \'log\';
$message = \'log--\';
$conn_args = array(
  \'host\' => \'127.0.0.1\',
  \'port\' => \'5672\',
  \'login\' => \'guest\',
  \'password\' => \'guest\',
  \'vhost\'=>\'/\'
);
$connection = new AMQPConnection($conn_args);
$connection->connect() or die("Cannot connect to the broker!\\n");
try {
  $channel = new AMQPChannel($connection); // 建立一个 Channel信道

  $exchange = new AMQPExchange($channel); //创建交换机
  $exchange->setName($exchangeName); //设置交换机的名字
  $exchange->setType(AMQP_EX_TYPE_FANOUT); //fanout类型 Exchange 类型: direct fanout topic
  $exchange->setFlags(AMQP_DURABLE); //持久化
  $exchange->declareExchange();

  for($i=1 ; $i<=20;$i++){
    //Fanout类型最简单,这种模型忽略routingkey
    $exchange->publish($message.$i, "");
    var_dump("[x] Sent $message $i");
  }
} catch (AMQPConnectionException $e) {
  var_dump($e);
  exit();
}
$connection->disconnect();

?>

订阅者:

fanout_ca.php

<?php
/*
* RabbitMQ fanout 模式
* create by superrd
*/

$exchangeName = \'log\';
$queueName = \'queuea\';
$routeKey = \'\';
$conn_args = array(
  \'host\' => \'127.0.0.1\',
  \'port\' => \'5672\',
  \'login\' => \'guest\',
  \'password\' => \'guest\',
  \'vhost\'=>\'/\'
);
$connection = new AMQPConnection($conn_args);
$connection->connect() or die("Cannot connect to the broker!\\n");

$channel = new AMQPChannel($connection);

$exchange = new AMQPExchange($channel);
$exchange->setName($exchangeName);
$exchange->setType(AMQP_EX_TYPE_FANOUT);
$exchange->setFlags(AMQP_DURABLE);
$exchange->declareExchange();

$queue = new AMQPQueue($channel);
$queue->setName($queueName);
$queue->setFlags(AMQP_DURABLE);
echo "Message Total:".$queue->declareQueue()."\\n"; //队列中消息的总量
$queue->bind($exchangeName, $routeKey);

//阻塞模式接收消息
echo "Message:\\n";
while(True){
  $queue->consume(\'processMessage\');
  //$queue->consume(\'processMessage\', AMQP_AUTOACK); //自动ACK应答
}

$conn->disconnect();
/**
* 消费回调函数
* 处理消息
*/
function processMessage($envelope, $queue) {
  $msg = $envelope->getBody();
  echo $msg."\\n"; //处理消息
  $queue->ack($envelope->getDeliveryTag()); //手动发送ACK应答
}

 ?>

 

订阅者:

fanout_cb.php

<?php
/*
* RabbitMQ fanout 模式
* create by superrd
*/

$exchangeName = \'log\';
$queueName = \'queueb\';
$routeKey = \'\'; //Fanout类型最简单,这种模型忽略routingkey
$conn_args = array(
  \'host\' => \'127.0.0.1\',
  \'port\' => \'5672\',
  \'login\' => \'guest\',
  \'password\' => \'guest\',
  \'vhost\'=>\'/\'
);

$connection = new AMQPConnection($conn_args);
$connection->connect() or die("Cannot connect to the broker!\\n");

$channel = new AMQPChannel($connection);

$exchange = new AMQPExchange($channel);
$exchange->setName($exchangeName);
$exchange->setType(AMQP_EX_TYPE_FANOUT);
$exchange->setFlags(AMQP_DURABLE);
$exchange->declareExchange();

$queue = new AMQPQueue($channel);
$queue->setName($queueName);
$queue->setFlags(AMQP_DURABLE);
echo "Message Total:".$queue->declareQueue()."\\n"; //队列中消息的总量
$queue->bind($exchangeName, $routeKey);

//阻塞模式接收消息
echo "Message:\\n";
while(True){
  $queue->consume(\'processMessage\');
  //$queue->consume(\'processMessage\', AMQP_AUTOACK); //自动ACK应答
}

$conn->disconnect();
/**
* 消费回调函数
* 处理消息
*/
function processMessage($envelope, $queue) {
  $msg = $envelope->getBody();
  // sleep(1);
  echo $msg."\\n"; //处理消息
  $queue->ack($envelope->getDeliveryTag()); //手动发送ACK应答
}

?>

 

以上是关于RabbitMQ消息队列-通过fanout模式将消息推送到多个Queue中的主要内容,如果未能解决你的问题,请参考以下文章

spring boot整合RabbitMQ(Fanout模式)

rabbitMQ的第三种模型(fanout)

rabbitMQ的第三种模型(fanout)

2020-03-17 20:18:50springboot整合rabbitmq

springboot整合消息队列——RabbitMQ

C#利用RabbitMQ实现消息订阅与发布