路由Routing
这篇主要较上一篇新增一个功能--只接收订阅消息的一个字集。eg.只把严重的错误日志信息写入日志文件(存储到磁盘),但同时仍然讲所有日志信息输出到控制台中。
简单说来就是routing_key决定某一条交付给交换机exchange传给哪个队列,可以将多个routing_key设定给一个队列/也可以将同一个routing_key设定给多个队列(这种情况其实就是实现广播)
绑定Bindings
绑定的的时候可以带上一个额外的routing_key参数。为了避免与basic_publish的参数混淆,叫做绑定的键binding key。创建一个带binding key的绑定
channel.queue_bind(exchange=exchange_name,
queue=queue_name,
routing_key=\'black\')
binding key具体意义与采用的exchange类型有关,扇形交换机fanout exchanges会忽略这个值.
直连交换机direct exchange
相较于广播的方式,Direct exchange主要实现一种分类/过滤(扇形交换机fanout exchange就没有足够的灵活性,只是广播)。算法:交换机讲绑定键(binding key--在上面的queue_bind的时候创建的,即在绑定交换机和队列的时候设定的)和路由键(routing key--下面basic_publish中设定的,即发布消息的时候设定的)进行精确匹配,从而确定消息发送到哪一个队列,如下图
在这个场景中,我们可以看到直连交换机 X和两个队列进行了绑定。第一个队列使用orange作为绑定键,第二个队列有两个绑定,一个使用black作为绑定键,另外一个使用green。
这样以来,当路由键为orange的消息发布到交换机,就会被路由到队列Q1。路由键为black或者green的消息就会路由到Q2。其他的所有消息都将会被丢弃。
这种情况即:多个binding key可以被绑定到同一个queue上面
多个绑定multiple bindings
多个队列使用相同的绑定键是合法的。这个例子中,我们可以添加一个X和Q1之间的绑定,使用black绑定键。这样一来,直连交换机就和扇型交换机的行为一样,会将消息广播到所有匹配的队列。带有black路由键的消息会同时发送到Q1和Q2。
这种情况即:一个binding key绑定多个queue上面
发送日志
我们将会发送消息到一个直连交换机,把日志级别作为路由键。这样接收日志的脚本就可以根据严重级别来选择它想要处理的日志。我们先看看发送日志。
创建一个交换机exchange
channel.exchange_declare(exchange=\'direct_logs\',
type=\'direct\')
发送一条消息
channel.basic_publish(exchange=\'direct_logs\',
routing_key=severity,
body=message)
这里的严重程度severity值为info/warning/error中的一个
订阅
处理接收消息方式和之前差不多,只有一个例外,我们将会为我们感兴趣的每个严重级别分别创建一个新的绑定
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
for severity in severities:
channel.queue_bind(exchange=\'direct_logs\',
queue=queue_name,
routing_key=severity)
代码整合
简单说下这个图:交换机类型--direct,交换机和队列之间的binding key分为三种,这个写了error/info/warning,队列名是默认随机生成的
emit_log_direct.py
#!/usr/bin/env python
import pika
import sys
#连接
connection = pika.BlockingConnection(
pika.ConnectionParameters(host=\'localhost\'))
channel = connection.channel()
#声明名为direct_logs队列,类型direct
channel.exchange_declare(exchange=\'direct_logs\', exchange_type=\'direct\')
#命令行参数第一个为严重程度
severity = sys.argv[1] if len(sys.argv) > 2 else \'info\'
#从第二个命令行参数开始为信息
message = \' \'.join(sys.argv[2:]) or \'Hello World!\'
#basic_publish发布这些信息,交换机为direct_logs,routing_key为上面第一个命令行参数(默认为info类型,作为分类标准),消息体就是Message,是从第二个命令行参数开始的其他参数
channel.basic_publish(
exchange=\'direct_logs\', routing_key=severity, body=message)
print(" [x] Sent %r:%r" % (severity, message))
#做一些关闭之前的操作
connection.close()
receive_logs_direct.py
#!/usr/bin/env python
import pika
import sys
#连接
connection = pika.BlockingConnection(
pika.ConnectionParameters(host=\'localhost\'))
channel = connection.channel()
#声明一个名为direct_logs的交换机,类型为direct
channel.exchange_declare(exchange=\'direct_logs\', exchange_type=\'direct\')
#声明一个随机名字的队列,断开后有处理,名字存在queue_name中
result = channel.queue_declare(queue=\'\', exclusive=True)
queue_name = result.method.queue
#命令行参数为严重程度,如果没有命令行参数提示错误-->可以是一种或者多种
severities = sys.argv[1:]
if not severities:
sys.stderr.write("Usage: %s [info] [warning] [error]\\n" % sys.argv[0])
sys.exit(1)
#绑定交换机exchange和队列,routing_key--分类标准就是严重程度
for severity in severities:
channel.queue_bind(
exchange=\'direct_logs\', queue=queue_name, routing_key=severity)
print(\' [*] Waiting for logs. To exit press CTRL+C\')
def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body))
channel.basic_consume(
queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()
运行结果:可以看到接收端和发送端对于指定的info类型信息的交互
如果你希望只是保存warning和error级别的日志到磁盘,只需要打开控制台并输入:
$ python receive_logs_direct.py warning error > logs_from_rabbit.log
如果你希望所有的日志信息都输出到屏幕中,打开一个新的终端,然后输入:
$ python receive_logs_direct.py info warning error
[*] Waiting for logs. To exit press CTRL+C
如果要触发一个error级别的日志,只需要输入:
$ python emit_log_direct.py error "Run. Run. Or it will explode."
[x] Sent \'error\':\'Run. Run. Or it will explode.\'