我们改进了我们的日志系统 我们没有使用只有虚拟广播的 fanout 交换机,而是使用 direct 交换机,并有选择性地接收日志的可能性。
尽管使用直接交换改进了我们的系统,但它仍然有局限性 - 它不能根据多个标准进行路由选择。
在我们的日志系统中,我们可能不仅要根据严重性来订阅日志,还要根据发出日志的来源进行订阅。你可能从syslog unix工具知道这个概念 ,它根据严重性(info / warn / crit ...)和facility(auth / cron / kern ...)来路由日志。
这会给我们很大的灵活性 - 我们可能想要听取来自‘cron‘的严重错误,而且还要听取来自‘kern‘的所有日志。
为了在我们的日志系统中实现这一点,我们需要了解更复杂的话题交换。
话题交换
发送到主题交换的消息不能有任意的 routing_key - 它必须是由点分隔的单词列表。这些单词可以是任何东西,但通常它们指定连接到消息的一些功能。一些有效的路由键例子:“ stock.usd.nyse ”,“ nyse.vmw ”,“ quick.orange.rabbit ”。在路由选择键中可以有任意数量的字,最多255个字节。
绑定键也必须是相同的形式。主题交换背后的逻辑 类似于直接的 - 使用特定的路由密钥发送的消息将被传送到与匹配的绑定密钥绑定的所有队列。但是绑定键有两个重要的特殊情况:
- *(星号)可以代替一个字。
- #(散列)可以代替零个或多个单词。
在一个例子中解释这个很简单:
在这个例子中,我们将发送所有描述动物的信息。消息将使用由三个字(两个点)组成的路由键发送。路由关键字中的第一个单词将描述速度,第二个颜色和第三个种类:“ <speed>。<color>。<species>”。
我们创建了三个绑定:Q1绑定了绑定键“ * .orange。* ”,Q2 绑定了“ *。*。rabbit ”和“ lazy。# ”。
这些绑定可以概括为:
- Q1对所有的橙色动物感兴趣。
- Q2希望听到有关兔子的一切,以及关于懒惰动物的一切。
将路由键设置为“ quick.orange.rabbit ”的消息将传递到两个队列。消息“ lazy.orange.elephant ”也将去他们两个。另一方面,“ quick.orange.fox ”只会到第一个队列,而“ lazy.brown.fox ”只会到第二个队列。即使匹配两个绑定,“ lazy.pink.rabbit ”也只会被传递到第二个队列一次。“ quick.brown.fox”不匹配任何绑定,因此将被丢弃。
如果我们违反我们的合同,并发送一个或四个单词,如“ 橙色 ”或“ quick.orange.male.rabbit ” 的消息会发生什么?那么,这些消息将不匹配任何绑定,将会丢失。
另一方面,“ lazy.orange.male.rabbit ”即使有四个单词,也会匹配最后一个绑定,并被传递到第二个队列。
话题交换
话题交换功能强大,可以像其他交流一样行事。
当一个队列绑定了“ # ”(哈希)绑定密钥时,它将接收所有的消息,而不管路由密钥如何在扇出交换。
当在绑定中不使用特殊字符“ * ”(星号)和“ # ”(散列)时,主题交换将像直接交换一样。
把它放在一起
我们将在我们的日志系统中使用话题交换。我们首先假定日志的路由键有两个字:“ <facility>。<severity> ”
EmitLogTopic.cs的代码:
public static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost" }; using(var connection = factory.CreateConnection()) using(var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: "topic_logs", type: "topic"); var routingKey = (args.Length > 0) ? args[0] : "anonymous.info"; var message = (args.Length > 1) ? string.Join(" ", args.Skip( 1 ).ToArray()) : "Hello World!"; var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "topic_logs", routingKey: routingKey, basicProperties: null, body: body); Console.WriteLine(" [x] Sent ‘{0}‘:‘{1}‘", routingKey, message); } }
ReceiveLogsTopic.cs的代码:
public static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost" }; using(var connection = factory.CreateConnection()) using(var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: "topic_logs", type: "topic"); var queueName = channel.QueueDeclare().QueueName; if(args.Length < 1) { Console.Error.WriteLine("Usage: {0} [binding_key...]", Environment.GetCommandLineArgs()[0]); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); Environment.ExitCode = 1; return; } foreach(var bindingKey in args) { channel.QueueBind(queue: queueName, exchange: "topic_logs", routingKey: bindingKey); } Console.WriteLine(" [*] Waiting for messages. To exit press CTRL+C"); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); var routingKey = ea.RoutingKey; Console.WriteLine(" [x] Received ‘{0}‘:‘{1}‘", routingKey, message); }; channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } }
要接收所有日志:
cd ReceiveLogsTopic dotnet run "#"
要接收来自设施“ kern ”的所有日志:
cd ReceiveLogsTopic dotnet run "kern.*"
或者如果你只想听到关于“ 关键 ”日志:
ReceiveLogsTopic.exe "*.critical"
您可以创建多个绑定:
cd ReceiveLogsTopic dotnet run "kern.*" "*.critical"
并发送一个路由键“ kern.critical ”类型的日志:
cd EmitLogTopic dotnet run "kern.critical" "A critical kernel error"
玩这些程序玩得开心。请注意,代码不会对路由或绑定键作任何假设,您可能需要使用两个以上的路由键参数。