RabbitMQ笔记十七: Alternate Exchange
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ笔记十七: Alternate Exchange相关的知识,希望对你有一定的参考价值。
参考技术A应用启动类
启动应用启动类之后生成一个带有 alternate-exchange 属性的 Exchange 。
zhihao.miao.exchange.pay 是个包含 alternate-exchange 属性的 topic 类型的 exchange (route key是 zhihao.miao.pay.* ,队列名是 zhihao.miao.pay ), alternate-exchange 属性指定的是 fanout 类型的exchange,exchange的名称是 zhihao.miao.exchange.order (绑定到 zhihao.miao.order 队列)
如果正确的路由(符合 zhihao.miao.pay.* )规则,则 zhihao.miao.pay 队列接收到消息。如果是不正确的路由(不符合 zhihao.miao.pay.* )规则,则路由到 zhihao.miao.exchange.pay Exchange指定的 alternate-exchange 属性的Exchange中。
启动应用类:
此时发送消息到名为 zhihao.miao.exchange.pay 的 Exchange ,而 Route key 是 zhihao.miao.pay.aaa ,所以能正确地路由到 zhihao.miao.pay 队列中。
当指定的 Route key 不能正确的路由的时候,则直接发送到名为 zhihao.miao.exchange.order 的 Exchange ,而因为我们定义的 Exchange 类型是 fanout 类型,所以就能路由到 zhihao.miao.order 队列中了。
一般 alternate-exchange 属性的值最好是 fanout 类型的 exchange ,否则还会根据 route key 与 alternate-exchange 属性的 exchange 进行匹配再去路由。而如果指定了 fanout 类型的 exchange ,不需要去匹配 routekey 。
示列说明
创建了一个 topic 类型的Exchange带有 alternate-exchange 属性,其 alternate-exchange 的 exchange 也是 topic 类型的 exchange ,如果消息的 route key 既不能,这个消息就会丢失。可以触发 publish confirm 机制,表示这个消息没有确认。
配置:
正常路由到Exchange名为head.info路由的队列中。
路由到Exchange名为head.info指定的alternate-exchange配置的head.error所路由的队列中。
二者都不符合则消息丢失,可以使用publish confirm来做生产端的消息确认,因为消息没有正确路由到队列,所以触发了return method。
配置:
Alternate Exchange官网
python第五十七天------补上笔记
direct_client:广播接收
1 #!/usr/bin/env python 2 3 #_*_coding:utf-8_*_ 4 5 import pika,time,sys 6 7 connection = pika.BlockingConnection(pika.ConnectionParameters( 8 ‘localhost‘)) 9 channel = connection.channel() 10 11 channel.exchange_declare(exchange=‘direct_logs‘,#定义一个接收的频道 12 type=‘direct‘) 13 14 reult=channel.queue_declare(exclusive=True)#随机生成唯一的队列名,会在消息接收后自动删除 15 queuename=reult.method.queue#队列名 自动生成 16 17 18 severities = sys.argv[1:] 19 if not severities: 20 sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])#启动接收的消息级别 21 sys.exit(1) 22 23 for severity in severities:#循环接收各级别的消息 24 channel.queue_bind(exchange=‘direct_logs‘, 25 queue=queuename, 26 routing_key=severity) 27 28 def callback(ch, method, properties, body):#回调函数 29 print(‘接收消息中…………‘) 30 #time.sleep(5) 31 print(" [x] Received %r" % body.decode()) 32 ch.basic_ack(delivery_tag=method.delivery_tag) 33 34 35 channel.basic_qos(prefetch_count=1)#同时只处理一个消息 36 channel.basic_consume(callback,#接收到消息调用回调函数 callback 37 queue=queuename, 38 #no_ack=True 39 ) 40 41 print(‘ [*] 接收消息中. To exit press CTRL+C‘) 42 43 channel.start_consuming()#启动消息接收
rabbitmq_server_direct 服务端 广播
1 #!/usr/bin/env python 2 #{data} {time} 3 #_*_coding:utf-8_*_ 4 5 import pika,sys,time 6 connection = pika.BlockingConnection(pika.ConnectionParameters( 7 ‘localhost‘)) 8 channel = connection.channel()#管道 9 10 11 12 severity = sys.argv[1] if len(sys.argv) > 1 else ‘info‘#启动参数 默认无参数为 info 级别 13 msg=‘‘.join(sys.argv[2:]) or ‘info:消息默认发送………‘#启动参数 为空,发默认消息 14 for i in range(10): 15 time.sleep(1) 16 channel.basic_publish(exchange=‘direct_logs‘,#绑定频道 17 routing_key=severity,#默认的消息队列级别 18 body=msg+str(i), 19 #properties=pika.BasicProperties(delivery_mode=2)#持久化 广播不能使用 20 ) 21 print(msg,severity) 22 connection.close() 23 #channel.close()
rabbitmq_server:
rabbitmq_server_2: 消息持久化
1 import pika 2 connection = pika.BlockingConnection(pika.ConnectionParameters( 3 ‘localhost‘)) 4 channel = connection.channel()#管道 5 6 #声明queue 7 #channel.queue_declare(queue=‘hello‘)#队列名 hello 8 channel.queue_declare(queue=‘hello‘,durable=True)#队列名 hello,持久化队列 9 10 for i in range(10): 11 12 channel.basic_publish(exchange=‘‘, 13 routing_key=‘hello‘, 14 body=‘Hello World!%s‘%i, 15 properties=pika.BasicProperties(delivery_mode=2)) 16 print(" [x] Sent ‘Hello World!‘",i) 17 connection.close()
rabbitmq_server_fanout. fanout 模式发送端
1 import pika,sys,time 2 connection = pika.BlockingConnection(pika.ConnectionParameters( 3 ‘localhost‘)) 4 channel = connection.channel()#管道 5 6 #声明queue 广播模式不用声明队列 7 #channel.queue_declare(queue=‘hello‘)#队列名 hello 8 #channel.queue_declare(queue=‘hello‘,durable=True)#队列名 hello,持久化队列 9 10 argv=input(‘输入消息‘) 11 msg=‘‘.join(sys.argv[1:]) or ‘info:消息默认发送………‘ 12 for i in range(10): 13 time.sleep(1) 14 channel.basic_publish(exchange=‘logs‘,#绑定频道 15 #routing_key=‘hello‘, 16 routing_key=‘‘, 17 body=msg+str(i), 18 #properties=pika.BasicProperties(delivery_mode=2)#持久化 广播不能使用 19 ) 20 print(msg,i) 21 #connection.close()
rabbitmq_client_fanout fanout 接收端
1 #!/usr/bin/env python 2 #{data} {time} 3 4 #_*_coding:utf-8_*_ 5 6 import pika,time 7 8 connection = pika.BlockingConnection(pika.ConnectionParameters( 9 ‘localhost‘)) 10 channel = connection.channel() 11 #channel.queue_declare(queue=‘hello2‘)#服务端与客户端的设置需一致,不然会报错 12 #channel.queue_declare(queue=‘hello2‘,durable=True)#队列名 hello,持久化队列 13 channel.exchange_declare(exchange=‘logs‘,#绑定频道 14 type=‘fanout‘)#接收类型 15 reult=channel.queue_declare(exclusive=True)#随机生成唯一的队列名,会在消息接收后自动删除 16 queuename=reult.method.queue#队列名 自动生成 17 channel.queue_bind(exchange=‘logs‘,#先要绑定频道 18 queue=queuename 19 ) 20 21 22 def callback(ch, method, properties, body):#回调函数 23 print(‘接收消息中…………‘) 24 #time.sleep(5) 25 print(" [x] Received %r" % body.decode()) 26 ch.basic_ack(delivery_tag=method.delivery_tag) 27 28 29 channel.basic_qos(prefetch_count=1)#同时只处理一个消息 30 channel.basic_consume(callback,#接收到消息调用回调函数 callback 31 queue=queuename, 32 #no_ack=True 33 ) 34 35 print(‘ [*] 接收消息中. To exit press CTRL+C‘) 36 37 channel.start_consuming()#启动消息接收
以上是关于RabbitMQ笔记十七: Alternate Exchange的主要内容,如果未能解决你的问题,请参考以下文章
spring boot 1.5.4 整合rabbitMQ(十七)
RabbitMQ入门教程(十七):消息队列的应用场景和常见的消息队列之间的比较
Java网络商城项目 SpringBoot+SpringCloud+Vue 网络商城(SSM前后端分离项目)十七(安装RabbitMQ(Centos6)(入门使用教程))以及Spring AMQP的)
Java网络商城项目 SpringBoot+SpringCloud+Vue 网络商城(SSM前后端分离项目)十七(安装RabbitMQ(Centos6)(入门使用教程))以及Spring AMQP的)