RabbitMQ消息队列: 主题分发
Posted Alex
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ消息队列: 主题分发相关的知识,希望对你有一定的参考价值。
1. 主题(Topics):
fanout模式只能进行简单的广播,direct模式虽然在过滤上进行了一定的提升,但是不能支持复杂的条件,
比如我们的日志消息,现在不仅要知道消息级别,也要知道消息来源。在这样的复杂需求下,我们需要使用
主题交换。
2. 主题交换:
发送主题交换的的routing_key不是任意的,必须遵循如下格式:使用.分隔的一些字。通常这些字用来表示
消息的某些特性,如:"stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit"。
注意routing_key的最大长度是255。
绑定routing_key也必须是同样的格式,交换后端形式与直接路由相似,交换匹配消息中的routing_key和绑定队列
所需要接受消息的routing_key,并且将满足条件的消息进行派发。
通配符:
* -- 代表一个字(word)
# -- 代表零个或者多个字
如下图模型,我们使用"<celerity>.<colour>.<species>"来形容动物,可见Q1关心所有橘黄的动物,
Q2关心所有兔子或者懒惰的动物。
"quick.orange.rabbit" -- 分发到Q1和Q2
"lazy.orange.elephant" -- 分发到Q1和Q2
"quick.orange.fox" -- 分发到Q1
"lazy.brown.fox" -- 分发到Q2
"lazy.pink.rabbit" -- 只分发一次到Q2,尽管匹配两个条件
"quick.brown.fox" -- 无匹配,丢弃
"quick.orange.male.rabbit" -- 无匹配,丢弃
"lazy.orange.male.rabbit" -- 匹配规则3,分发到Q2
3. 测试代码:
emit_log_topic.py
1 #!/usr/bin/env python 2 import pika 3 import sys 4 5 connection = pika.BlockingConnection(pika.ConnectionParameters( 6 host=‘localhost‘)) 7 channel = connection.channel() 8 9 channel.exchange_declare(exchange=‘topic_logs‘, 10 type=‘topic‘) 11 12 routing_key = sys.argv[1] if len(sys.argv) > 1 else ‘anonymous.info‘ 13 message = ‘ ‘.join(sys.argv[2:]) or ‘Hello World!‘ 14 channel.basic_publish(exchange=‘topic_logs‘, 15 routing_key=routing_key, 16 body=message) 17 print(" [x] Sent %r:%r" % (routing_key, message)) 18 connection.close()
receive_logs_topic.py
#!/usr/bin/env python import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host=‘localhost‘)) channel = connection.channel() channel.exchange_declare(exchange=‘topic_logs‘, type=‘topic‘) result = channel.queue_declare(exclusive=True) queue_name = result.method.queue binding_keys = sys.argv[1:] if not binding_keys: sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0]) sys.exit(1) for binding_key in binding_keys: channel.queue_bind(exchange=‘topic_logs‘, queue=queue_name, routing_key=binding_key) print(‘ [*] Waiting for logs. To exit press CTRL+C‘) def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
以上是关于RabbitMQ消息队列: 主题分发的主要内容,如果未能解决你的问题,请参考以下文章