[rabbitmq] python版本 路由

Posted lonelyisland

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了[rabbitmq] python版本 路由相关的知识,希望对你有一定的参考价值。

路由Routing

这篇主要较上一篇新增一个功能--只接收订阅消息的一个字集。eg.只把严重的错误日志信息写入日志文件(存储到磁盘),但同时仍然讲所有日志信息输出到控制台中。
简单说来就是routing_key决定某一条交付给交换机exchange传给哪个队列,可以将多个routing_key设定给一个队列/也可以将同一个routing_key设定给多个队列(这种情况其实就是实现广播)

绑定Bindings

绑定的的时候可以带上一个额外的routing_key参数。为了避免与basic_publish的参数混淆,叫做绑定的键binding key。创建一个带binding key的绑定

channel.queue_bind(exchange=exchange_name,
                   queue=queue_name,
                   routing_key=\'black\')

binding key具体意义与采用的exchange类型有关,扇形交换机fanout exchanges会忽略这个值.

直连交换机direct exchange

相较于广播的方式,Direct exchange主要实现一种分类/过滤(扇形交换机fanout exchange就没有足够的灵活性,只是广播)。算法:交换机讲绑定键(binding key--在上面的queue_bind的时候创建的,即在绑定交换机和队列的时候设定的)和路由键(routing key--下面basic_publish中设定的,即发布消息的时候设定的)进行精确匹配,从而确定消息发送到哪一个队列,如下图

在这个场景中,我们可以看到直连交换机 X和两个队列进行了绑定。第一个队列使用orange作为绑定键,第二个队列有两个绑定,一个使用black作为绑定键,另外一个使用green。

这样以来,当路由键为orange的消息发布到交换机,就会被路由到队列Q1。路由键为black或者green的消息就会路由到Q2。其他的所有消息都将会被丢弃。

这种情况即:多个binding key可以被绑定到同一个queue上面

多个绑定multiple bindings


多个队列使用相同的绑定键是合法的。这个例子中,我们可以添加一个X和Q1之间的绑定,使用black绑定键。这样一来,直连交换机就和扇型交换机的行为一样,会将消息广播到所有匹配的队列。带有black路由键的消息会同时发送到Q1和Q2。

这种情况即:一个binding key绑定多个queue上面

发送日志

我们将会发送消息到一个直连交换机,把日志级别作为路由键。这样接收日志的脚本就可以根据严重级别来选择它想要处理的日志。我们先看看发送日志。
创建一个交换机exchange

channel.exchange_declare(exchange=\'direct_logs\',
                         type=\'direct\')

发送一条消息

channel.basic_publish(exchange=\'direct_logs\',
                      routing_key=severity,
                      body=message)

这里的严重程度severity值为info/warning/error中的一个

订阅

处理接收消息方式和之前差不多,只有一个例外,我们将会为我们感兴趣的每个严重级别分别创建一个新的绑定

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

for severity in severities:
    channel.queue_bind(exchange=\'direct_logs\',
                       queue=queue_name,
                       routing_key=severity)

代码整合


简单说下这个图:交换机类型--direct,交换机和队列之间的binding key分为三种,这个写了error/info/warning,队列名是默认随机生成的

emit_log_direct.py

#!/usr/bin/env python
import pika
import sys

#连接
connection = pika.BlockingConnection(
    pika.ConnectionParameters(host=\'localhost\'))
channel = connection.channel()

#声明名为direct_logs队列,类型direct
channel.exchange_declare(exchange=\'direct_logs\', exchange_type=\'direct\')

#命令行参数第一个为严重程度
severity = sys.argv[1] if len(sys.argv) > 2 else \'info\'
#从第二个命令行参数开始为信息
message = \' \'.join(sys.argv[2:]) or \'Hello World!\'
#basic_publish发布这些信息,交换机为direct_logs,routing_key为上面第一个命令行参数(默认为info类型,作为分类标准),消息体就是Message,是从第二个命令行参数开始的其他参数
channel.basic_publish(
    exchange=\'direct_logs\', routing_key=severity, body=message)
print(" [x] Sent %r:%r" % (severity, message))
#做一些关闭之前的操作
connection.close()

receive_logs_direct.py

#!/usr/bin/env python
import pika
import sys

#连接
connection = pika.BlockingConnection(
    pika.ConnectionParameters(host=\'localhost\'))
channel = connection.channel()

#声明一个名为direct_logs的交换机,类型为direct
channel.exchange_declare(exchange=\'direct_logs\', exchange_type=\'direct\')

#声明一个随机名字的队列,断开后有处理,名字存在queue_name中
result = channel.queue_declare(queue=\'\', exclusive=True)
queue_name = result.method.queue

#命令行参数为严重程度,如果没有命令行参数提示错误-->可以是一种或者多种
severities = sys.argv[1:]
if not severities:
    sys.stderr.write("Usage: %s [info] [warning] [error]\\n" % sys.argv[0])
    sys.exit(1)

#绑定交换机exchange和队列,routing_key--分类标准就是严重程度 
for severity in severities:
    channel.queue_bind(
        exchange=\'direct_logs\', queue=queue_name, routing_key=severity)

print(\' [*] Waiting for logs. To exit press CTRL+C\')


def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))


channel.basic_consume(
    queue=queue_name, on_message_callback=callback, auto_ack=True)

channel.start_consuming()

运行结果:可以看到接收端和发送端对于指定的info类型信息的交互

如果你希望只是保存warning和error级别的日志到磁盘,只需要打开控制台并输入:

$ python receive_logs_direct.py warning error > logs_from_rabbit.log

如果你希望所有的日志信息都输出到屏幕中,打开一个新的终端,然后输入:

$ python receive_logs_direct.py info warning error
 [*] Waiting for logs. To exit press CTRL+C

如果要触发一个error级别的日志,只需要输入:

$ python emit_log_direct.py error "Run. Run. Or it will explode."
 [x] Sent \'error\':\'Run. Run. Or it will explode.\'

以上是关于[rabbitmq] python版本 路由的主要内容,如果未能解决你的问题,请参考以下文章

python使用rabbitMQ介绍四(路由模式)

如何升级到python3版本并且安装pip3及ipython3

RabbitMQ python 演示 route

消息队列rabbitmq的五种工作模式(go语言版本)

python中消息队列RabbitMQ的使用

python中RabbitMQ的使用(安装和简单教程)