RabbitMq初探——发布与订阅

Posted 王大西

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMq初探——发布与订阅相关的知识,希望对你有一定的参考价值。

publish and subscribe

前言


前面的例子 我们都是用到的都是消息单一消费,即一条消息被单个消费者消费。像微博系统的消息推送,是一条消息推送给所有订阅到该频道的用户。

这里我们就需要用到rabbitmq的发布与订阅(publish and subscribe)

 

原理


前面我们弱化rabbitmq,只抽象出了 生产者、队列、消费者三个概念。

现在需要介绍rabbitmq的整体数据流转过程。

数据由生产者发送给交换机,交换机接收数据并把它发送给与自己绑定好的队列,队列接收消息并且把它发送给消费者。

事实上,生产者根本不会知道消息是发送给谁的,也不需要关心。who cares?!

 exchange的类型

移步 RabbitMQ各种交换机类型Exchange Types介绍

发布与订阅

一个中心生产者,多个消费者。

生产者生产消息给类型为fanout的exchange,多个queue与该exchange绑定,消费者从queue中获取消息。

 

代码


 

1. 生产者、消费者声明类型为fanout、名称为logs的交换机

2. 消费者进程声明名称随机的queue(用于每new 一个进程就会产生一个队列),将queue与logs exchange绑定

整体代码如下

fanout_sender.php

<?php
/**
 * Created by PhpStorm.
 * User: wangdaxi
 * Date: 2017/10/20
 * Time: 14:20
 */
require_once __DIR__ . \'/vendor/autoload.php\';
use PhpAmqpLib\\Connection\\AMQPStreamConnection;
use PhpAmqpLib\\Message\\AMQPMessage;

$connection = new AMQPStreamConnection(\'127.0.0.1\', 5672, \'guest\', \'guest\');
$channel = $connection->channel();

$channel->exchange_declare(\'logs\', \'fanout\', false, false, false);

$data = implode(\' \', array_slice($argv, 1));
empty($data) && $data = \'Hello World\';
$msg = new AMQPMessage($data);

$channel->basic_publish($msg, \'logs\');
echo " [x] Sent $data \\n";

$channel->close();
$connection->close();

 

fanout_receive.php

<?php
/**
 * Created by PhpStorm.
 * User: wangdaxi
 * Date: 2017/10/20
 * Time: 14:32
 */
require_once __DIR__ . \'/vendor/autoload.php\';
use PhpAmqpLib\\Connection\\AMQPStreamConnection;

$connection = new AMQPStreamConnection(\'127.0.0.1\', 5672, \'guest\', \'guest\');
$channel = $connection->channel();

$channel->exchange_declare(\'logs\', \'fanout\', false, false, false);

list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);

$channel->queue_bind($queue_name, \'logs\');

echo \' [*] Waiting for logs. To exit press CTRL+C\', "\\n";

$callback = function($msg){
    echo "\\n [x] " . $msg->body;
};

//消费,关闭ack
$channel->basic_consume($queue_name, \'\', false, true, false, false, $callback);

while(count($channel->callbacks)) {
    $channel->wait();
}

$channel->close();
$connection->close();

 

测试


 

开启一个终端作为生产者P,两个消费者作为消费者C1,C2。

生产者生产消息,会发现每个消费者都会收到同样的消息。很简单,不上图。

以上。

以上是关于RabbitMq初探——发布与订阅的主要内容,如果未能解决你的问题,请参考以下文章

.NET文件并发与RabbitMQ(初探RabbitMQ)

.NET文件并发与RabbitMQ(初探RabbitMQ)

分布式环境下rabbitmq发布与订阅端

RabbitMQ03_Springboot整合RabbitMQ实现发布与订阅模式路由模式通配符模式

RabbitMQ03_Springboot整合RabbitMQ实现发布与订阅模式路由模式通配符模式

Python-RabbitMQ消息队列的发布与订阅