RabbitMQ笔记十七: Alternate Exchange

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ笔记十七: Alternate Exchange相关的知识,希望对你有一定的参考价值。

参考技术A

应用启动类

启动应用启动类之后生成一个带有 alternate-exchange 属性的 Exchange 。

zhihao.miao.exchange.pay 是个包含 alternate-exchange 属性的 topic 类型的 exchange (route key是 zhihao.miao.pay.* ,队列名是 zhihao.miao.pay ), alternate-exchange 属性指定的是 fanout 类型的exchange,exchange的名称是 zhihao.miao.exchange.order (绑定到 zhihao.miao.order 队列)

如果正确的路由(符合 zhihao.miao.pay.* )规则,则 zhihao.miao.pay 队列接收到消息。如果是不正确的路由(不符合 zhihao.miao.pay.* )规则,则路由到 zhihao.miao.exchange.pay Exchange指定的 alternate-exchange 属性的Exchange中。

启动应用类:

此时发送消息到名为 zhihao.miao.exchange.pay 的 Exchange ,而 Route key 是 zhihao.miao.pay.aaa ,所以能正确地路由到 zhihao.miao.pay 队列中。

当指定的 Route key 不能正确的路由的时候,则直接发送到名为 zhihao.miao.exchange.order 的 Exchange ,而因为我们定义的 Exchange 类型是 fanout 类型,所以就能路由到 zhihao.miao.order 队列中了。

一般 alternate-exchange 属性的值最好是 fanout 类型的 exchange ,否则还会根据 route key 与 alternate-exchange 属性的 exchange 进行匹配再去路由。而如果指定了 fanout 类型的 exchange ,不需要去匹配 routekey 。

示列说明

创建了一个 topic 类型的Exchange带有 alternate-exchange 属性,其 alternate-exchange 的 exchange 也是 topic 类型的 exchange ,如果消息的 route key 既不能,这个消息就会丢失。可以触发 publish confirm 机制,表示这个消息没有确认。

配置:

正常路由到Exchange名为head.info路由的队列中。

路由到Exchange名为head.info指定的alternate-exchange配置的head.error所路由的队列中。

二者都不符合则消息丢失,可以使用publish confirm来做生产端的消息确认,因为消息没有正确路由到队列,所以触发了return method。

配置:

Alternate Exchange官网

python第五十七天------补上笔记

direct_client:广播接收

技术分享
 1 #!/usr/bin/env python
 2 
 3 #_*_coding:utf-8_*_
 4 
 5 import pika,time,sys
 6 
 7 connection = pika.BlockingConnection(pika.ConnectionParameters(
 8                localhost))
 9 channel = connection.channel()
10 
11 channel.exchange_declare(exchange=direct_logs,#定义一个接收的频道
12                          type=direct)
13 
14 reult=channel.queue_declare(exclusive=True)#随机生成唯一的队列名,会在消息接收后自动删除
15 queuename=reult.method.queue#队列名 自动生成
16 
17 
18 severities = sys.argv[1:]
19 if not severities:
20     sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])#启动接收的消息级别
21     sys.exit(1)
22 
23 for severity in severities:#循环接收各级别的消息
24     channel.queue_bind(exchange=direct_logs,
25                        queue=queuename,
26                        routing_key=severity)
27 
28 def callback(ch, method, properties, body):#回调函数
29     print(接收消息中…………)
30     #time.sleep(5)
31     print(" [x] Received %r" % body.decode())
32     ch.basic_ack(delivery_tag=method.delivery_tag)
33 
34 
35 channel.basic_qos(prefetch_count=1)#同时只处理一个消息
36 channel.basic_consume(callback,#接收到消息调用回调函数 callback
37                       queue=queuename,
38                       #no_ack=True
39                        )
40 
41 print( [*] 接收消息中. To exit press CTRL+C)
42 
43 channel.start_consuming()#启动消息接收
View Code

 

rabbitmq_server_direct 服务端 广播

技术分享
 1 #!/usr/bin/env python
 2 #{data} {time}
 3 #_*_coding:utf-8_*_
 4 
 5 import pika,sys,time
 6 connection = pika.BlockingConnection(pika.ConnectionParameters(
 7                localhost))
 8 channel = connection.channel()#管道
 9 
10 
11 
12 severity = sys.argv[1] if len(sys.argv) > 1 else info#启动参数 默认无参数为 info 级别
13 msg=‘‘.join(sys.argv[2:]) or  info:消息默认发送………#启动参数 为空,发默认消息
14 for i in range(10):
15     time.sleep(1)
16     channel.basic_publish(exchange=direct_logs,#绑定频道
17                           routing_key=severity,#默认的消息队列级别
18                           body=msg+str(i),
19                           #properties=pika.BasicProperties(delivery_mode=2)#持久化 广播不能使用
20                            )
21     print(msg,severity)
22 connection.close()
23 #channel.close()
View Code

 

rabbitmq_server:

技术分享View Code

 

rabbitmq_server_2: 消息持久化

技术分享
 1 import pika
 2 connection = pika.BlockingConnection(pika.ConnectionParameters(
 3                localhost))
 4 channel = connection.channel()#管道
 5 
 6 #声明queue
 7 #channel.queue_declare(queue=‘hello‘)#队列名 hello
 8 channel.queue_declare(queue=hello,durable=True)#队列名 hello,持久化队列
 9 
10 for i in range(10):
11 
12     channel.basic_publish(exchange=‘‘,
13                           routing_key=hello,
14                           body=Hello World!%s%i,
15                           properties=pika.BasicProperties(delivery_mode=2))
16     print(" [x] Sent ‘Hello World!‘",i)
17 connection.close()
View Code

rabbitmq_server_fanout.   fanout 模式发送端

技术分享
 1 import pika,sys,time
 2 connection = pika.BlockingConnection(pika.ConnectionParameters(
 3                localhost))
 4 channel = connection.channel()#管道
 5 
 6 #声明queue 广播模式不用声明队列
 7 #channel.queue_declare(queue=‘hello‘)#队列名 hello
 8 #channel.queue_declare(queue=‘hello‘,durable=True)#队列名 hello,持久化队列
 9 
10 argv=input(输入消息)
11 msg=‘‘.join(sys.argv[1:]) or  info:消息默认发送………
12 for i in range(10):
13     time.sleep(1)
14     channel.basic_publish(exchange=logs,#绑定频道
15                           #routing_key=‘hello‘,
16                           routing_key=‘‘,
17                           body=msg+str(i),
18                           #properties=pika.BasicProperties(delivery_mode=2)#持久化 广播不能使用
19                            )
20     print(msg,i)
21 #connection.close()
View Code

rabbitmq_client_fanout   fanout 接收端

技术分享
 1 #!/usr/bin/env python
 2 #{data} {time}
 3 
 4 #_*_coding:utf-8_*_
 5 
 6 import pika,time
 7 
 8 connection = pika.BlockingConnection(pika.ConnectionParameters(
 9                localhost))
10 channel = connection.channel()
11 #channel.queue_declare(queue=‘hello2‘)#服务端与客户端的设置需一致,不然会报错
12 #channel.queue_declare(queue=‘hello2‘,durable=True)#队列名 hello,持久化队列
13 channel.exchange_declare(exchange=logs,#绑定频道
14                          type=fanout)#接收类型
15 reult=channel.queue_declare(exclusive=True)#随机生成唯一的队列名,会在消息接收后自动删除
16 queuename=reult.method.queue#队列名 自动生成
17 channel.queue_bind(exchange=logs,#先要绑定频道
18                    queue=queuename
19                    )
20 
21 
22 def callback(ch, method, properties, body):#回调函数
23     print(接收消息中…………)
24     #time.sleep(5)
25     print(" [x] Received %r" % body.decode())
26     ch.basic_ack(delivery_tag=method.delivery_tag)
27 
28 
29 channel.basic_qos(prefetch_count=1)#同时只处理一个消息
30 channel.basic_consume(callback,#接收到消息调用回调函数 callback
31                       queue=queuename,
32                       #no_ack=True
33                        )
34 
35 print( [*] 接收消息中. To exit press CTRL+C)
36 
37 channel.start_consuming()#启动消息接收
View Code

 

以上是关于RabbitMQ笔记十七: Alternate Exchange的主要内容,如果未能解决你的问题,请参考以下文章

python第五十七天------补上笔记

spring boot 1.5.4 整合rabbitMQ(十七)

RabbitMQ入门教程(十七):消息队列的应用场景和常见的消息队列之间的比较

Java网络商城项目 SpringBoot+SpringCloud+Vue 网络商城(SSM前后端分离项目)十七(安装RabbitMQ(Centos6)(入门使用教程))以及Spring AMQP的)

Java网络商城项目 SpringBoot+SpringCloud+Vue 网络商城(SSM前后端分离项目)十七(安装RabbitMQ(Centos6)(入门使用教程))以及Spring AMQP的)

iOS工作笔记(十七)