在前面的教程中,我们创建了一个工作队列。工作队列背后的假设是,每个任务只被传递给一个工作人员。在这一部分,我们将做一些完全不同的事情 - 我们会向多个消费者传递信息。这种模式被称为“发布/订阅”。
本质上,发布的日志消息将被广播给所有的接收者
生产者 是发送消息的用户的应用程序。
队列 是存储消息的缓冲器。
消费者 是接收消息的用户的应用程序。
RabbitMQ中消息传递模型的核心思想是生产者永远不会将任何消息直接发送到队列中。实际上,生产者通常甚至不知道一个消息是否会被传送到任何队列中。
相反,制作人只能发送消息给交易所。交换是一件非常简单的事情。一方面它接收来自生产者的消息,另一方则推动他们排队。交易所必须知道如何处理收到的消息。是否应该附加到特定的队列?是否应该附加到许多队列?还是应该丢弃。这些规则是由交换类型定义的 。
有几种可用的交换类型: direct, topic, headers , fanout
channel.ExchangeDeclare("logs", "fanout");
fanout 交换非常简单。正如你可能从名字中猜到的那样,它只是将所有收到的消息广播到它所知道的所有队列中。这正是我们记录器所需要的。
列出交易所
要列出服务器上的交换,命令
rabbitmqctl list_exchanges
在这个列表中将会有一些amq。* 交换和默认(未命名)交换。这些是默认创建的。
默认交换
在本教程的前面部分,我们对交换一无所知,但仍能够将消息发送到队列。这是可能的,因为我们使用了一个默认的交换,我们用空字符串(“”)来标识。
channel.BasicPublish(exchange:"",//默认交换 routingKey:"hello", basicProperties:null, body:"发送内容");
第一个参数是交易所的名称。空字符串表示默认或无名交换:消息被路由到具有由 routingKey 指定的名称的队列(如果存在)
我们可以发布到我们的命名交换
channel.BasicPublish(exchange:"logs", routingKey:"", basicProperties:null, body:"发送内容");
临时队列
正如你以前可能记得我们使用的是具有指定名称的队列(请记住hello和task_queue?)。能够列出队列对我们至关重要 - 我们需要指出工人队列。当你想分享生产者和消费者之间的队列时,给队列一个名字是很重要的。
但是我们的记录器并不是这样。我们希望了解所有日志消息,而不仅仅是其中的一部分。我们也只对目前流动的消息感兴趣,而不是旧消息。要解决这个问题,我们需要两件事。
首先,每当我们连接到 Rabbit ,我们需要一个新的,空的队列。要做到这一点,我们可以创建一个随机名称的队列,或者,甚至更好 - 让服务器为我们选择一个随机队列名称。
其次,一旦我们断开消费者,队列应该被自动删除。
在.NET客户端中,当我们不向queueDeclare() 提供参数时,我们 使用生成的名称创建一个非持久的独占的自动删除队列:
var queueName = channel.QueueDeclare().QueueName;
此时queueName包含一个随机队列名称。例如,它可能看起来像amq.gen-JzTY20BRgKO-HjmUJj0wLg。
绑定
我们已经创建了一个 fanout 交换和一个队列。现在我们需要告诉交换机将消息发送到我们的队列。交换和队列之间的关系被称为绑定。
channel.QueueBind(queue: queueName, exchange: "logs", routingKey: "");
从现在起,日志交换将把消息附加到我们的队列中。
列出绑定
您可以使用列出现有的绑定命令
rabbitmqctl list_bindings
把它放在一起
发出日志消息的生产者程序与前面的教程没有什么不同。最重要的变化是,我们现在要发布消息到我们的日志交换,而不是无名的。发送时我们需要提供一个路由密钥,但是对于扇出交换,它的值将被忽略。
1.建立 发布者(生产者)
public static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost" }; using(var connection = factory.CreateConnection()) using(var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: "logs", type: "fanout"); var message = GetMessage(args); var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "logs", routingKey: "", basicProperties: null, body: body); Console.WriteLine(" [x] Sent {0}", message); } Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } private static string GetMessage(string[] args) { return ((args.Length > 0) ? string.Join(" ", args) : "info: Hello World!"); }
如你所见,建立连接后,我们宣布交换。这一步是必要的,因为发布到一个不存在的交易所是被禁止的。
如果没有队列绑定到交换机上,消息将会丢失,但对我们来说没关系; 如果没有消费者正在听,我们可以放心地丢弃消息。
1.建立 订阅者(消费者)
public static void Main() { var factory = new ConnectionFactory() { HostName = "localhost" }; using(var connection = factory.CreateConnection()) using(var channel = connection.CreateModel()) {
// 设置交易所的名称 以及类型 channel.ExchangeDeclare(exchange: "logs", type: "fanout");
//让系统生成队列名称 var queueName = channel.QueueDeclare().QueueName;
channel.QueueBind(queue: queueName, exchange: "logs", routingKey: ""); Console.WriteLine(" [*] Waiting for logs."); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine(" [x] {0}", message); }; channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } }
如果您想将日志保存到文件中,只需打开一个控制台并输入:
cd ReceiveLogs
dotnet run > logs_from_rabbit.log
如果你想看到屏幕上的日志,产生一个新的终端,并运行:
cd ReceiveLogs
dotnet run
要发射日志类型:
cd EmitLog
dotnet run
使用rabbitmqctl list_bindings,你可以验证代码实际上是否创建了绑定和队列。有两个接受程序运行,你应该看到类似于:
sudo rabbitmqctl list_bindings
# => Listing bindings ...
# => logs exchange amq.gen-JzTY20BRgKO-HjmUJj0wLg queue []
# => logs exchange amq.gen-vso0PVvyiRIL2WoV3i48Yg queue []
# => ...done.
结果的解释很简单:交换日志中的数据转到两个带有服务器分配名称的队列中。这正是我们的意图。