RabbitMq学习笔记五:路由选择(Routing)
Posted RunningFan
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMq学习笔记五:路由选择(Routing)相关的知识,希望对你有一定的参考价值。
前面已经学习了rabbitmq的消息的发布(publish)和订阅(subscrible),参见http://blog.csdn.net/u010416588/article/details/54667952
一 路由选择(Routing)
前面章节我们创建了一个简单的日志系统,我们可以广播日志消息给所有的接收者。本篇会给日志系统增加新的特性,让日志系统订阅部分消息。eg,我们仅仅将致命的错误写入日志文件,与此同时仍在控制面板上打印所有的其他类型的日志消息。
二 绑定关系(Binding)
在前一章节中我们已经使用过绑定,像下面的代码
channel.queueBind(queueName, EXCHANGE_NAME, "");
绑定表示转发器与队列之间的关系,可以简单的理解为队列对该转发器的消息感兴趣。
绑定可以附带一个额外的参数routingKey。为了避免basicPublish方法(发布消息)参数混淆,我们把它叫做绑定键(binding key)。下面展示怎样用绑定键创建一个绑定。
channel.queueBind(queueName, EXCHANGE_NAME, "black");
绑定键的意义依赖于转发器的类型。对于fanout类型,忽略此参数。
三 直接转发
前面我们的日志系统把所有的消息广播给所有的消费者。我们希望对其扩展,允许根据日志的严重性来进行过滤日志。eg,我们希望程序能够把严重的错误日志下载磁盘上,而不是浪费磁盘空间记录所有的warning、info日志。
之前用的fanout类型的转发器,但是灵活性不够,它仅仅简单的转发。
我们将会使用direct类型转发器进行代替。direct类型转换器背后的路由算法很简单:消息会被推送至绑定键(binging key)和发布消息发布的附带选择键(routing key)匹配的队列。
上图,我们可以看到direct类型的转发器与两个队列绑定。第一个队列与绑定键orange绑定,第二个队列与转发器间有两个绑定,一个与绑定键black绑定,另一个与green绑定键绑定。
这样的话,当一个消息附带一个选择键(routing key) orange发布至转发器将会被导向到队列Q1。消息附带一个选择键(routing key)black或者green将会被导向到Q2.所有的其他的消息将会被丢弃。
四 多重绑定
使用一个绑定键(binding key)绑定多个队列是完全合法的。如上图,一个附带选择键(routing key)的消息将会被转发到Q1和Q2。
五 发送日志
我们准备将这种模式用于我们的日志系统。我们将消息发送到direct类型的转发器而不是fanout类型。我们将把日志的严重性作为选择键(routing key)。这样的话,接收程序可以根据严重性来选择接收。我们首先关注发送日志的代码:
像以前一样,我们需要先创建一个转发器:
channel.exchangeDeclare(EXCHANGE_NAME,"direct");
然后我们准备发送一条消息:
channel.basicPublish(EXCHANGE_NAME,severity, null, message.getBytes());
假定‘severity’是‘info’,‘warning’,‘error’中的一个。
六 订阅
接收消息跟之前类似,有一点不一样—-就是我们只对我们感兴趣的严重性日志创建一个绑定。
String queueName = channel.queueDeclare().getQueue();
for(String severity : argv)
channel.queueBind(queueName, EXCHANGE_NAME, severity);
七 完整的例子
如上图,定义一个direct类型的转发器,队列1与转发器绑定,绑定键为error,只记录类型为error的日志。队列二绑定转发器,绑定键为info,error,warning,会接收三种类型的消息。
发送消息类EmitLogDirect.java
package com.gta.goldnock.mq.routing;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
*
* @ClassName: EmitLogDirect
* @Description: TODO(消息发送类,转发器类型direct)
* @author yuhuan.gao
* @date 2017年1月22日 下午5:45:50
*
*/
public class EmitLogDirect
//转发器名称
private static final String EXCHANGE_NAME = "ex_logs_direct";
//几种安全级别日志
private static final String[] SEVERITIES = "info", "warning", "error" ;
public static void main(String[] argv) throws java.io.IOException, TimeoutException
// 创建连接和频道
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 声明转发器的类型
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
//发送6条消息
for (int i = 0; i < 6; i++)
String severity = getSeverity();
String message = severity + "_log :" + UUID.randomUUID().toString();
// 发布消息至转发器,指定routingkey
channel.basicPublish(EXCHANGE_NAME, severity, null, message
.getBytes());
System.out.println(" [x] Sent '" + message + "'");
channel.close();
connection.close();
/**
* 随机产生一种日志类型
*
* @return
*/
private static String getSeverity()
Random random = new Random();
int ranVal = random.nextInt(3);
return SEVERITIES[ranVal];
接收消息类ReceiveLogsDirect.java
package com.gta.goldnock.mq.routing;
import java.io.IOException;
import java.util.Random;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
/**
*
* @ClassName: ReceiveLogsDirect
* @Description: TODO(消息接收类,接收转发器类型为direct)
* @author yuhuan.gao
* @date 2017年1月22日 下午5:55:20
*
*/
public class ReceiveLogsDirect
//转发器名称
private static final String EXCHANGE_NAME = "ex_logs_direct";
//几种安全级别日志
private static final String[] SEVERITIES = "info", "warning", "error" ;
public static void main(String[] args) throws IOException, TimeoutException
// 创建连接和频道
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 声明direct类型转发器
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
String queueName = channel.queueDeclare().getQueue();
String severity = getSeverity();
// 指定binding_key
channel.queueBind(queueName, EXCHANGE_NAME, severity);
System.out.println(" [*] Waiting for "+severity+" logs. To exit press CTRL+C");
Consumer consumer = new DefaultConsumer(channel)
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");
;
//指定接收者,第二个参数为自动应答,(消息接收失败不会转发到其他接收者)
channel.basicConsume(queueName, true, consumer);
private static String getSeverity()
Random random = new Random();
int ranVal = random.nextInt(3);
return SEVERITIES[ranVal];
运行接收消息类,分别启动info、warning、error日志监控,运行发送消息类。
EmitLogDirect控制台输出:
[x] Sent 'error_log :3249e8ea-1d88-445e-bcf3-3197f4f33563'
[x] Sent 'info_log :31782811-4e6e-4a30-9e2a-673427f9c33b'
[x] Sent 'info_log :897961a5-e2ed-4263-a1a1-83d8ed417fed'
[x] Sent 'warning_log :912a3ce6-c611-4ea9-a4ea-9e3a1436f83f'
[x] Sent 'error_log :3ae1efc0-110a-492e-a69f-2b2b1f766d8a'
[x] Sent 'info_log :df08baa2-1b2c-485f-adab-350a67652e42'
[x] Sent 'info_log :48d254b1-bf8a-490c-8551-c85342ed9694'
[x] Sent 'error_log :5b275de8-adbe-4d1c-8866-7025721f4031'
[x] Sent 'info_log :9a9cdad0-2116-4448-b79f-a5339a8f9a40'
[x] Sent 'info_log :e0ad7687-f70c-480f-961b-bdfc41b8d37f'
接收消息端info:
[*] Waiting for info logs. To exit press CTRL+C
[x] Received 'info':'info_log :31782811-4e6e-4a30-9e2a-673427f9c33b'
[x] Received 'info':'info_log :897961a5-e2ed-4263-a1a1-83d8ed417fed'
[x] Received 'info':'info_log :df08baa2-1b2c-485f-adab-350a67652e42'
[x] Received 'info':'info_log :48d254b1-bf8a-490c-8551-c85342ed9694'
[x] Received 'info':'info_log :9a9cdad0-2116-4448-b79f-a5339a8f9a40'
[x] Received 'info':'info_log :e0ad7687-f70c-480f-961b-bdfc41b8d37f'
接收消息端warning:
[*] Waiting for warning logs. To exit press CTRL+C
[x] Received 'warning':'warning_log :912a3ce6-c611-4ea9-a4ea-9e3a1436f83f'
接收消息端error:
[*] Waiting for error logs. To exit press CTRL+C
[x] Received 'error':'error_log :3249e8ea-1d88-445e-bcf3-3197f4f33563'
[x] Received 'error':'error_log :3ae1efc0-110a-492e-a69f-2b2b1f766d8a'
[x] Received 'error':'error_log :5b275de8-adbe-4d1c-8866-7025721f4031'
从上面的例子可以看出:
1 接收者可以自定义自己感兴趣类型的日志。
2 发送消息时可以设置routing_key,接收队列与转发器间可以设置binding_key,接收者接收与binding_key与routing_key相同的消息。
以上是关于RabbitMq学习笔记五:路由选择(Routing)的主要内容,如果未能解决你的问题,请参考以下文章
RabbitMQ学习笔记五:RabbitMQ之优先级消息队列