RabbitMQ学习笔记六:话题(Topics)

Posted RunningFan

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ学习笔记六:话题(Topics)相关的知识,希望对你有一定的参考价值。

前一篇我们已经改善了我们的日志系统,不用fanout类型的转发器愚蠢的广播消息,而是使用direct类型的转发器,能够选择性的接收我们想要接收的消息。参见Rabbit学习笔记五:路由选择(Routing)http://blog.csdn.net/u010416588/article/details/54670060

一 话题(topics)

   虽然使用direct类型改良了我们的系统,但是仍然存在一些局限性:它不能够基于多重条件进行路由选择。
   在我们的日志系统中,我们可能希望不仅根据日志的级别订阅,同时想根据日志的来源进行订阅。这个概念类似unix工具:syslog,它转发日志基于严重性(info/warning/crit…)和设备(auth/cron/kern…)
   这样可能给我们更多的灵活性:我们可能只想订阅来自’cron’的致命错误日志,而且也来自’kern’的。
   为了在我们的系统中实现上述的需求,我们需要学习稍微复杂的主题类型的转发器(topic exchange)。

二 话题转发器

    发往主题类型的转发器的消息不能随意的设置选择键(routing_key),必须是由点隔开的一系列的标识符组成。标识符可以是任何东西,但是一般都与消息的某些特性相关。一些合法的选择键的例子:"stock.usd.nyse", "nyse.vmw","quick.orange.rabbit".你可以定义任何数量的标识符,上限为255个字节(32个字符)。
    绑定键和选择键的形式一样。主题类型的转发器背后的逻辑和直接类型的转发器很类似:一个附带特殊的选择键将会被转发到绑定键与之匹配的队列中。需要注意的是:关于绑定键有两种特殊的情况。
*可以匹配一个标识符。
#可以匹配0个或多个标识符。

如下图:

   在上面的例子中,我们要发送描述动物信息的消息。消息发送的路由键(routing key)包含三个标识符(两个点隔开)。第一个标识符描述动物的速度,第二个描述颜色,第三个描述物种:"<speed>.<colour>.<species>".
   我们创建三个绑定关系:
   1 队列Q1与topic类型转发器绑定,绑定键为“*.organge.*”
   2 队列Q2与topic类型转发器绑定,绑定键为“*.*.rabbit”
   3 队列Q2与topic类型转发器绑定,绑定键为“lazy.#”

这些绑定关系可以概括为:
队列Q1对所有的橙色动物感兴趣
队列Q2想知道所有关于兔子以及懒洋洋的动物的一切.

例如:
    路由键为 "quick.orange.rabbit"的消息将会被发送到队列Q1和Q2。
    路由键为 "lazy.orange.elephant"的消息将会被发送到队列Q1和Q2。
    路由键为 "quick.orange.fox"的消息将会被发送到队列Q1。
    路由键为 "lazy.brown.fox"的消息将会被发送到队列Q2。
    路由键为 "lazy.pink.rabbit"的消息将会被发送到队列Q2。并且仅发一次,尽管有两个路邮件匹配。
    路由键为 "quick.brown.fox"的消息没有匹配,将会被抛弃。

如果我们违反约定,发送一个或者四个标识符的选择键,类似:orange,quick.orange.male.rabbit,这些选择键不能与任何绑定键匹配,所以消息将会被丢弃。
另一方面,lazy.orange.male.rabbit,虽然是四个标识符,也可以与lazy.#匹配,从而转发至Q2。

注:主题类型的转发器非常强大,可以实现其他类型的转发器。
当一个队列与绑定键#绑定,将会收到所有的消息,类似fanout类型转发器。
当绑定键中不包含任何#与*时,类似direct类型转发器。

三 完整的例子

PS:RabbitMQ server在本地,如果不在本地,请修改连接配置
现在我们在日志系统中使用topic类型的转发器。我们以一个假设为前提,即日志的路由键有两个关键字 “.”
下面代码和之前类似:

3.1 消息发送类

发送五条消息,路由键分别为
“kernel.info”, “cron.warning”, “auth.info”,”auth.critical”, “kernel.critical”
EmitLogTopic.Java

package com.gta.goldnock.mq.topics;

import java.io.IOException;
import java.util.Random;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * 
* @ClassName: EmitLogTopic
* @Description: TODO(发送消息类,转发器为topic类型)
* @author yuhuan.gao
* @date 2017年1月23日 下午1:45:35
*
 */
public class EmitLogTopic 

    private static final String EXCHANGE_NAME = "topic_logs";  

    public static void main(String[] argv) throws IOException, TimeoutException  
        // 创建连接和频道  
        ConnectionFactory factory = new ConnectionFactory();  
        factory.setHost("localhost");  
        Connection connection = factory.newConnection();  
        Channel channel = connection.createChannel();  

        channel.exchangeDeclare(EXCHANGE_NAME, "topic");  

        String[] routing_keys = new String[]  "kernel.info", "cron.warning",  
                "auth.info","auth.critical", "kernel.critical" ;  
        int i = 1;
        for (String routing_key : routing_keys)
            String msg = "test msg,message num is " + i;  
            i++;
            channel.basicPublish(EXCHANGE_NAME, routing_key, null, msg  
                    .getBytes());  
            System.out.println(" [x] Sent routingKey = "+routing_key+" ,msg = " + msg + ".");  
          
        channel.close();  
        connection.close();  
      

3.2 消息接收类

RecCritical只接收与critical相关的消息
RecKernel只接收与kernel相关的消息
RecCriticalAndKernel接收与critical及kernel相关的所有消息
RecCritical.java

package com.gta.goldnock.mq.topics;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
/**
 * 
* @ClassName: RecCritical
* @Description: TODO(接收消息, 接收critical级别消息)
* @author yuhuan.gao
* @date 2017年1月23日 下午1:56:34
*
 */
public class RecCritical 

    private static final String EXCHANGE_NAME = "topic_logs";  

    public static void main(String[] args) throws IOException, TimeoutException 
        // 创建连接和频道  
        ConnectionFactory factory = new ConnectionFactory();  
        factory.setHost("localhost");  
        Connection connection = factory.newConnection();  
        Channel channel = connection.createChannel();  
        // 声明转发器  
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");  
        // 随机生成一个队列  
        String queueName = channel.queueDeclare().getQueue();  

        // 接收所有与critical相关的消息  
        channel.queueBind(queueName, EXCHANGE_NAME, "*.critical");  

        System.out.println(" [*] Waiting for critical messages. To exit press CTRL+C");  

        Consumer consumer = new DefaultConsumer(channel)
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                    AMQP.BasicProperties properties, byte[] body) throws IOException 
                    String message = new String(body, "UTF-8");
                    System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");
                   
        ;
        //指定接收者,第二个参数为自动应答,(消息接收失败不会转发到其他接收者)
        channel.basicConsume(queueName, true, consumer);
    

RecKernel.java

package com.gta.goldnock.mq.topics;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
/**
 * 
* @ClassName: RecKernel
* @Description: TODO(接收消息类,接收与kernel相关的消息)
* @author yuhuan.gao
* @date 2017年1月23日 下午1:59:20
*
 */
public class RecKernel 

    private static final String EXCHANGE_NAME = "topic_logs";  

    public static void main(String[] args) throws IOException, TimeoutException 
        // 创建连接和频道  
        ConnectionFactory factory = new ConnectionFactory();  
        factory.setHost("localhost");  
        Connection connection = factory.newConnection();  
        Channel channel = connection.createChannel();  
        // 声明转发器  
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");  
        // 随机生成一个队列  
        String queueName = channel.queueDeclare().getQueue();  

        // 接收所有与kernel相关的消息  
        channel.queueBind(queueName, EXCHANGE_NAME, "kernel.*");  

        System.out.println(" [*] Waiting for kernel messages. To exit press CTRL+C");  

        Consumer consumer = new DefaultConsumer(channel)
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                    AMQP.BasicProperties properties, byte[] body) throws IOException 
                    String message = new String(body, "UTF-8");
                    System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");
                   
        ;
        //指定接收者,第二个参数为自动应答,(消息接收失败不会转发到其他接收者)
        channel.basicConsume(queueName, true, consumer);
    

RecCriticalAndKernel.java

package com.gta.goldnock.mq.topics;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
/**
 * 
* @ClassName: RecCriticalAndKernel
* @Description: TODO(接收消息类,接收与critical和kernel相关的消息)
* @author yuhuan.gao
* @date 2017年1月23日 下午2:01:05
*
 */
public class RecCriticalAndKernel 

    private static final String EXCHANGE_NAME = "topic_logs";  

    public static void main(String[] args) throws IOException, TimeoutException 
        // 创建连接和频道  
        ConnectionFactory factory = new ConnectionFactory();  
        factory.setHost("localhost");  
        Connection connection = factory.newConnection();  
        Channel channel = connection.createChannel();  
        // 声明转发器  
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");  
        // 随机生成一个队列  
        String queueName = channel.queueDeclare().getQueue();  

        // 接收与critical和kernel相关的消息
        channel.queueBind(queueName, EXCHANGE_NAME, "kernel.*");  
        channel.queueBind(queueName, EXCHANGE_NAME, "*.critical");
        System.out.println(" [*] Waiting for kernel or critical messages. To exit press CTRL+C");  

        Consumer consumer = new DefaultConsumer(channel)
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                    AMQP.BasicProperties properties, byte[] body) throws IOException 
                    String message = new String(body, "UTF-8");
                    System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");
                   
        ;
        //指定接收者,第二个参数为自动应答,(消息接收失败不会转发到其他接收者)
        channel.basicConsume(queueName, true, consumer);
    

3.3 运行

运行三个消息接收者,再运行消息发送者.
EmitLogTopic运行结果:

 [x] Sent routingKey = kernel.info ,msg = test msg,message num is 1.
 [x] Sent routingKey = cron.warning ,msg = test msg,message num is 2.
 [x] Sent routingKey = auth.info ,msg = test msg,message num is 3.
 [x] Sent routingKey = auth.critical ,msg = test msg,message num is 4.
 [x] Sent routingKey = kernel.critical ,msg = test msg,message num is 5.

RecCritical运行结果:

 [*] Waiting for critical messages. To exit press CTRL+C
 [x] Received 'auth.critical':'test msg,message num is 4'
 [x] Received 'kernel.critical':'test msg,message num is 5'

RecKernel运行结果:

 [*] Waiting for kernel messages. To exit press CTRL+C
 [x] Received 'kernel.info':'test msg,message num is 1'
 [x] Received 'kernel.critical':'test msg,message num is 5'

RecCriticalAndKernel运行结果

 [*] Waiting for kernel or critical messages. To exit press CTRL+C
 [x] Received 'kernel.info':'test msg,message num is 1'
 [x] Received 'auth.critical':'test msg,message num is 4'
 [x] Received 'kernel.critical':'test msg,message num is 5'

可以看到,我们通过使用topic类型的转发器,成功实现了多重条件选择的订阅。

以上是关于RabbitMQ学习笔记六:话题(Topics)的主要内容,如果未能解决你的问题,请参考以下文章

ROS2学习笔记4--认识ros2话题topic

ROS学习笔记三(理解ROS节点)

rabbitmq详解

python学习笔记——拾壹

多线程编程学习笔记——任务并行库

SpringAMQP整合RabbitMQ-五种工作模式Demo