轻松搞定RabbitMQ——路由选择

Posted Damon.Luo

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了轻松搞定RabbitMQ——路由选择相关的知识,希望对你有一定的参考价值。

转自 http://blog.csdn.net/xiaoxian8023/article/details/48733249

翻译地址:http://www.rabbitmq.com/tutorials/tutorial-four-java.html

       在前篇博文中,我们建立了一个简单的日志系统。可以广播消息给多个消费者。本篇博文,我们将添加新的特性——我们可以只订阅部分消息。比如:我们可以接收Error级别的消息写入文件。同时仍然可以在控制台打印所有日志。


Bindings(绑定)

       在上一篇博客中我们已经使用过绑定。类似下面的代码:

 

[java] view plain copy
 
  1. channel.queueBind(queueName, EXCHANGE_NAME, "");  

       绑定表示转换器与队列之间的关系。可以简单的人为:队列对该转发器上的消息感兴趣。

 

       绑定可以设定额外的routingKey参数。为了与避免basicPublish方法(发布消息的方法)的参数混淆,我们准备把它称作绑定键(binding key)。下面展示如何使用绑定键(binding key)来创建一个绑定:

 

[java] view plain copy
 
  1. channel.queueBind(queueName, EXCHANGE_NAME, "black");  

       绑定键关键取决于转换器的类型。对于fanout类型,忽略此参数。

 


Direct exchange(直接转发)

       前面讲到我们的日志系统广播消息给所有的消费者。我们想对其扩展,根据消息的严重性来过滤消息。例如:我们希望将致命错误的日志消息记录到文件,而不是把磁盘空间浪费在warn和info类型的日志上。我们使用的fanout转发器,不能给我们太多的灵活性。它仅仅只是盲目的广播而已。我们使用direct转发器进行代替,其背后的算法很简单——消息会被推送至绑定键(binding key)和消息发布附带的选择键(routing key)完全匹配的队列。

技术分享

       在上图中,我们可以看到direct类型的转发器与2个队列进行了绑定。第一个队列使用的绑定键是orange,第二个队列绑定键为black和green。这样当消息发布到转发器是,附带orange绑定键的消息将被路由到队列Q1中去。附带black和green绑定键的消息被路由到Q2中去。其他消息全部丢弃。


Multiple bindings(多重绑定)

技术分享

       使用一个绑定键绑定多个队列是完全合法的。如上图,绑定键black绑定了2个队列——Q1和Q2。


Emitting logs(发送日志)

       我们将这种模式用于日志系统,发送消息给direct类型的转发器。我们将 提供日志严重性做为绑定键。那样,接收程序可以选择性的接收严重性的消息。首先关注发送日志的代码:

像往常一样首先创建一个转换器:

 

[java] view plain copy
 
  1. channel.exchangeDeclare(EXCHANGE_NAME, "direct");  

       然后为发送消息做准备:

 

 

[java] view plain copy
 
  1. channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());  

       为了简化代码,我们假定日志的严重性是‘info’,‘warning’,‘error’中之一。

 


Subscribing(订阅)

       接收消息跟前面博文中的一样。我们仅需要修改一个地方:为每一个我们感兴趣的严重性的消息,创建一个新的绑定。

 

[java] view plain copy
 
  1. String queueName = channel.queueDeclare().getQueue();  
  2.   
  3. for(String severity : argv){      
  4.   channel.queueBind(queueName, EXCHANGE_NAME, severity);  
  5. }  

 

 

完整的例子

 

技术分享

发送端代码(EmitLogDirect.java)

 

[java] view plain copy
 
  1. public class EmitLogDirect {  
  2.     private final static String EXCHANGE_NAME = "direct_logs";  
  3.   
  4.     public static void main(String[] args) throws IOException {  
  5.         /** 
  6.          * 创建连接连接到MabbitMQ 
  7.          */  
  8.         ConnectionFactory factory = new ConnectionFactory();  
  9.         // 设置MabbitMQ所在主机ip或者主机名  
  10.         factory.setHost("127.0.0.1");  
  11.         // 创建一个连接  
  12.         Connection connection = factory.newConnection();  
  13.         // 创建一个频道  
  14.         Channel channel = connection.createChannel();  
  15.         // 指定转发——广播  
  16.         channel.exchangeDeclare(EXCHANGE_NAME, "direct");  
  17.   
  18.         //所有日志严重性级别  
  19.         String[] severities={"error","info","warning"};  
  20.         for(int i=0;i<3;i++){  
  21.             String severity = severities[i%3];//每一次发送一条不同严重性的日志  
  22.               
  23.             // 发送的消息  
  24.             String message = "Hello World"+Strings.repeat(".", i+1);  
  25.             //参数1:exchange name  
  26.             //参数2:routing key  
  27.             channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());  
  28.             System.out.println(" [x] Sent ‘" + severity +"‘:‘"+ message + "‘");  
  29.         }  
  30.         // 关闭频道和连接  
  31.         channel.close();  
  32.         connection.close();  
  33.     }  
  34. }  

消费者1(ReceiveLogs2Console.java)

 

[java] view plain copy
 
  1. public class ReceiveLogs2Console {  
  2.     private static final String EXCHANGE_NAME = "direct_logs";  
  3.   
  4.     public static void main(String[] argv) throws IOException, InterruptedException {  
  5.         ConnectionFactory factory = new ConnectionFactory();  
  6.         factory.setHost("127.0.0.1");  
  7.         // 打开连接和创建频道,与发送端一样  
  8.         Connection connection = factory.newConnection();  
  9.         final Channel channel = connection.createChannel();  
  10.   
  11.         channel.exchangeDeclare(EXCHANGE_NAME, "direct");  
  12.         // 声明一个随机队列  
  13.         String queueName = channel.queueDeclare().getQueue();  
  14.   
  15.         //所有日志严重性级别  
  16.         String[] severities={"error","info","warning"};  
  17.         for (String severity : severities) {  
  18.             //关注所有级别的日志(多重绑定)  
  19.             channel.queueBind(queueName, EXCHANGE_NAME, severity);  
  20.         }  
  21.         System.out.println(" [*] Waiting for messages. To exit press CTRL+C");  
  22.           
  23.         // 创建队列消费者  
  24.         final Consumer consumer = new DefaultConsumer(channel) {  
  25.               @Override  
  26.               public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  
  27.                 String message = new String(body, "UTF-8");  
  28.                 System.out.println(" [x] Received ‘"  + envelope.getRoutingKey() + "‘:‘" + message + "‘");  
  29.               }  
  30.             };  
  31.             channel.basicConsume(queueName, true, consumer);  
  32.     }  
  33. }  

 

消费者2(ReceiveLogs2File.java)

[java] view plain copy
 
  1. public class ReceiveLogs2File {  
  2.     private static final String EXCHANGE_NAME = "direct_logs";  
  3.   
  4.     public static void main(String[] argv) throws IOException, InterruptedException {  
  5.         ConnectionFactory factory = new ConnectionFactory();  
  6.         factory.setHost("127.0.0.1");  
  7.         // 打开连接和创建频道,与发送端一样  
  8.         Connection connection = factory.newConnection();  
  9.         final Channel channel = connection.createChannel();  
  10.   
  11.         channel.exchangeDeclare(EXCHANGE_NAME, "direct");  
  12.         // 声明一个随机队列  
  13.         String queueName = channel.queueDeclare().getQueue();  
  14.           
  15.         String severity="error";//只关注error级别的日志,然后记录到文件中去。  
  16.         channel.queueBind(queueName, EXCHANGE_NAME, severity);  
  17.           
  18.         System.out.println(" [*] Waiting for messages. To exit press CTRL+C");  
  19.           
  20.         // 创建队列消费者  
  21.         final Consumer consumer = new DefaultConsumer(channel) {  
  22.               @Override  
  23.               public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  
  24.                 String message = new String(body, "UTF-8");  
  25.                 //记录日志到文件:  
  26.                 print2File( "["+ envelope.getRoutingKey() + "] "+message);  
  27.               }  
  28.             };  
  29.             channel.basicConsume(queueName, true, consumer);  
  30.     }  
  31.       
  32.     private static void print2File(String msg) {  
  33.         try {  
  34.             String dir = ReceiveLogs2File.class.getClassLoader().getResource("").getPath();  
  35.             String logFileName = new SimpleDateFormat("yyyy-MM-dd").format(new Date());  
  36.             File file = new File(dir, logFileName + ".log");  
  37.             FileOutputStream fos = new FileOutputStream(file, true);  
  38.             fos.write((new SimpleDateFormat("HH:mm:ss").format(new Date())+" - "+msg + "\r\n").getBytes());  
  39.             fos.flush();  
  40.             fos.close();  
  41.         } catch (FileNotFoundException e) {  
  42.             e.printStackTrace();  
  43.         } catch (IOException e) {  
  44.             e.printStackTrace();  
  45.         }  
  46.     }    
  47. }  

       最终结果:

 

 

技术分享

       罗哩罗嗦的说这么多,其实就是说了这么一件事:我们可以使用Direct exchange+routingKey来过滤自己感兴趣的消息。一个队列可以绑定多个routingKey。这就是我们今天的主题——路由选择。










以上是关于轻松搞定RabbitMQ——路由选择的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMq学习笔记五:路由选择(Routing)

RabbitMq学习笔记五:路由选择(Routing)

一个拓扑,搞定BGP13条路径选择算法 - 案例分析

RabbitMQ 路由选择 (Routing)

RabbitMQ之Routing(路由有选择的接收)

RabbitMQ教程总结