RabbitMQ学习笔记六:话题(Topics)
Posted RunningFan
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ学习笔记六:话题(Topics)相关的知识,希望对你有一定的参考价值。
前一篇我们已经改善了我们的日志系统,不用fanout类型的转发器愚蠢的广播消息,而是使用direct类型的转发器,能够选择性的接收我们想要接收的消息。参见Rabbit学习笔记五:路由选择(Routing)http://blog.csdn.net/u010416588/article/details/54670060
一 话题(topics)
虽然使用direct类型改良了我们的系统,但是仍然存在一些局限性:它不能够基于多重条件进行路由选择。
在我们的日志系统中,我们可能希望不仅根据日志的级别订阅,同时想根据日志的来源进行订阅。这个概念类似unix工具:syslog,它转发日志基于严重性(info/warning/crit…)和设备(auth/cron/kern…)
这样可能给我们更多的灵活性:我们可能只想订阅来自’cron’的致命错误日志,而且也来自’kern’的。
为了在我们的系统中实现上述的需求,我们需要学习稍微复杂的主题类型的转发器(topic exchange)。
二 话题转发器
发往主题类型的转发器的消息不能随意的设置选择键(routing_key),必须是由点隔开的一系列的标识符组成。标识符可以是任何东西,但是一般都与消息的某些特性相关。一些合法的选择键的例子:"stock.usd.nyse", "nyse.vmw","quick.orange.rabbit".你可以定义任何数量的标识符,上限为255个字节(32个字符)。
绑定键和选择键的形式一样。主题类型的转发器背后的逻辑和直接类型的转发器很类似:一个附带特殊的选择键将会被转发到绑定键与之匹配的队列中。需要注意的是:关于绑定键有两种特殊的情况。
*可以匹配一个标识符。
#可以匹配0个或多个标识符。
如下图:
在上面的例子中,我们要发送描述动物信息的消息。消息发送的路由键(routing key)包含三个标识符(两个点隔开)。第一个标识符描述动物的速度,第二个描述颜色,第三个描述物种:"<speed>.<colour>.<species>".
我们创建三个绑定关系:
1 队列Q1与topic类型转发器绑定,绑定键为“*.organge.*”
2 队列Q2与topic类型转发器绑定,绑定键为“*.*.rabbit”
3 队列Q2与topic类型转发器绑定,绑定键为“lazy.#”
这些绑定关系可以概括为:
队列Q1对所有的橙色动物感兴趣
队列Q2想知道所有关于兔子以及懒洋洋的动物的一切.
例如:
路由键为 "quick.orange.rabbit"的消息将会被发送到队列Q1和Q2。
路由键为 "lazy.orange.elephant"的消息将会被发送到队列Q1和Q2。
路由键为 "quick.orange.fox"的消息将会被发送到队列Q1。
路由键为 "lazy.brown.fox"的消息将会被发送到队列Q2。
路由键为 "lazy.pink.rabbit"的消息将会被发送到队列Q2。并且仅发一次,尽管有两个路邮件匹配。
路由键为 "quick.brown.fox"的消息没有匹配,将会被抛弃。
如果我们违反约定,发送一个或者四个标识符的选择键,类似:orange,quick.orange.male.rabbit,这些选择键不能与任何绑定键匹配,所以消息将会被丢弃。
另一方面,lazy.orange.male.rabbit,虽然是四个标识符,也可以与lazy.#匹配,从而转发至Q2。
注:主题类型的转发器非常强大,可以实现其他类型的转发器。
当一个队列与绑定键#绑定,将会收到所有的消息,类似fanout类型转发器。
当绑定键中不包含任何#与*时,类似direct类型转发器。
三 完整的例子
PS:RabbitMQ server在本地,如果不在本地,请修改连接配置
现在我们在日志系统中使用topic类型的转发器。我们以一个假设为前提,即日志的路由键有两个关键字 “.”
下面代码和之前类似:
3.1 消息发送类
发送五条消息,路由键分别为
“kernel.info”, “cron.warning”, “auth.info”,”auth.critical”, “kernel.critical”
EmitLogTopic.Java
package com.gta.goldnock.mq.topics;
import java.io.IOException;
import java.util.Random;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
*
* @ClassName: EmitLogTopic
* @Description: TODO(发送消息类,转发器为topic类型)
* @author yuhuan.gao
* @date 2017年1月23日 下午1:45:35
*
*/
public class EmitLogTopic
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] argv) throws IOException, TimeoutException
// 创建连接和频道
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String[] routing_keys = new String[] "kernel.info", "cron.warning",
"auth.info","auth.critical", "kernel.critical" ;
int i = 1;
for (String routing_key : routing_keys)
String msg = "test msg,message num is " + i;
i++;
channel.basicPublish(EXCHANGE_NAME, routing_key, null, msg
.getBytes());
System.out.println(" [x] Sent routingKey = "+routing_key+" ,msg = " + msg + ".");
channel.close();
connection.close();
3.2 消息接收类
RecCritical只接收与critical相关的消息
RecKernel只接收与kernel相关的消息
RecCriticalAndKernel接收与critical及kernel相关的所有消息
RecCritical.java
package com.gta.goldnock.mq.topics;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
/**
*
* @ClassName: RecCritical
* @Description: TODO(接收消息, 接收critical级别消息)
* @author yuhuan.gao
* @date 2017年1月23日 下午1:56:34
*
*/
public class RecCritical
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws IOException, TimeoutException
// 创建连接和频道
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 声明转发器
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
// 随机生成一个队列
String queueName = channel.queueDeclare().getQueue();
// 接收所有与critical相关的消息
channel.queueBind(queueName, EXCHANGE_NAME, "*.critical");
System.out.println(" [*] Waiting for critical messages. To exit press CTRL+C");
Consumer consumer = new DefaultConsumer(channel)
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");
;
//指定接收者,第二个参数为自动应答,(消息接收失败不会转发到其他接收者)
channel.basicConsume(queueName, true, consumer);
RecKernel.java
package com.gta.goldnock.mq.topics;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
/**
*
* @ClassName: RecKernel
* @Description: TODO(接收消息类,接收与kernel相关的消息)
* @author yuhuan.gao
* @date 2017年1月23日 下午1:59:20
*
*/
public class RecKernel
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws IOException, TimeoutException
// 创建连接和频道
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 声明转发器
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
// 随机生成一个队列
String queueName = channel.queueDeclare().getQueue();
// 接收所有与kernel相关的消息
channel.queueBind(queueName, EXCHANGE_NAME, "kernel.*");
System.out.println(" [*] Waiting for kernel messages. To exit press CTRL+C");
Consumer consumer = new DefaultConsumer(channel)
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");
;
//指定接收者,第二个参数为自动应答,(消息接收失败不会转发到其他接收者)
channel.basicConsume(queueName, true, consumer);
RecCriticalAndKernel.java
package com.gta.goldnock.mq.topics;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
/**
*
* @ClassName: RecCriticalAndKernel
* @Description: TODO(接收消息类,接收与critical和kernel相关的消息)
* @author yuhuan.gao
* @date 2017年1月23日 下午2:01:05
*
*/
public class RecCriticalAndKernel
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws IOException, TimeoutException
// 创建连接和频道
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 声明转发器
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
// 随机生成一个队列
String queueName = channel.queueDeclare().getQueue();
// 接收与critical和kernel相关的消息
channel.queueBind(queueName, EXCHANGE_NAME, "kernel.*");
channel.queueBind(queueName, EXCHANGE_NAME, "*.critical");
System.out.println(" [*] Waiting for kernel or critical messages. To exit press CTRL+C");
Consumer consumer = new DefaultConsumer(channel)
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");
;
//指定接收者,第二个参数为自动应答,(消息接收失败不会转发到其他接收者)
channel.basicConsume(queueName, true, consumer);
3.3 运行
运行三个消息接收者,再运行消息发送者.
EmitLogTopic运行结果:
[x] Sent routingKey = kernel.info ,msg = test msg,message num is 1.
[x] Sent routingKey = cron.warning ,msg = test msg,message num is 2.
[x] Sent routingKey = auth.info ,msg = test msg,message num is 3.
[x] Sent routingKey = auth.critical ,msg = test msg,message num is 4.
[x] Sent routingKey = kernel.critical ,msg = test msg,message num is 5.
RecCritical运行结果:
[*] Waiting for critical messages. To exit press CTRL+C
[x] Received 'auth.critical':'test msg,message num is 4'
[x] Received 'kernel.critical':'test msg,message num is 5'
RecKernel运行结果:
[*] Waiting for kernel messages. To exit press CTRL+C
[x] Received 'kernel.info':'test msg,message num is 1'
[x] Received 'kernel.critical':'test msg,message num is 5'
RecCriticalAndKernel运行结果
[*] Waiting for kernel or critical messages. To exit press CTRL+C
[x] Received 'kernel.info':'test msg,message num is 1'
[x] Received 'auth.critical':'test msg,message num is 4'
[x] Received 'kernel.critical':'test msg,message num is 5'
可以看到,我们通过使用topic类型的转发器,成功实现了多重条件选择的订阅。
以上是关于RabbitMQ学习笔记六:话题(Topics)的主要内容,如果未能解决你的问题,请参考以下文章