
1 #!/usr/bin/env python 2 3 #_*_coding:utf-8_*_ 4 5 import pika,time,sys 6 7 connection = pika.BlockingConnection(pika.ConnectionParameters( 8 ‘localhost‘)) 9 channel = 10 11 channel.exchange_declare(exchange=‘direct_logs‘,#定义一个接收的频道 12 type=‘direct‘) 13 14 reult=channel.queue_declare(exclusive=True)#随机生成唯一的队列名,会在消息接收后自动删除 15 queuename=reult.method.queue#队列名 自动生成 16 17 18 severities = sys.argv[1:] 19 if not severities: 20 sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])#启动接收的消息级别 21 sys.exit(1) 22 23 for severity in severities:#循环接收各级别的消息 24 channel.queue_bind(exchange=‘direct_logs‘, 25 queue=queuename, 26 routing_key=severity) 27 28 def callback(ch, method, properties, body):#回调函数 29 print(‘接收消息中…………‘) 30 #time.sleep(5) 31 print(" [x] Received %r" % body.decode()) 32 ch.basic_ack(delivery_tag=method.delivery_tag) 33 34 35 channel.basic_qos(prefetch_count=1)#同时只处理一个消息 36 channel.basic_consume(callback,#接收到消息调用回调函数 callback 37 queue=queuename, 38 #no_ack=True 39 ) 40 41 print(‘ [*] 接收消息中. To exit press CTRL+C‘) 42 43 channel.start_consuming()#启动消息接收
rabbitmq_server_direct 服务端 广播

1 #!/usr/bin/env python 2 #{data} {time} 3 #_*_coding:utf-8_*_ 4 5 import pika,sys,time 6 connection = pika.BlockingConnection(pika.ConnectionParameters( 7 ‘localhost‘)) 8 channel =管道 9 10 11 12 severity = sys.argv[1] if len(sys.argv) > 1 else ‘info‘#启动参数 默认无参数为 info 级别 13 msg=‘‘.join(sys.argv[2:]) or ‘info:消息默认发送………‘#启动参数 为空,发默认消息 14 for i in range(10): 15 time.sleep(1) 16 channel.basic_publish(exchange=‘direct_logs‘,#绑定频道 17 routing_key=severity,#默认的消息队列级别 18 body=msg+str(i), 19 #properties=pika.BasicProperties(delivery_mode=2)#持久化 广播不能使用 20 ) 21 print(msg,severity) 22 connection.close() 23 #channel.close()

rabbitmq_server_2: 消息持久化

1 import pika 2 connection = pika.BlockingConnection(pika.ConnectionParameters( 3 ‘localhost‘)) 4 channel =管道 5 6 #声明queue 7 #channel.queue_declare(queue=‘hello‘)#队列名 hello 8 channel.queue_declare(queue=‘hello‘,durable=True)#队列名 hello,持久化队列 9 10 for i in range(10): 11 12 channel.basic_publish(exchange=‘‘, 13 routing_key=‘hello‘, 14 body=‘Hello World!%s‘%i, 15 properties=pika.BasicProperties(delivery_mode=2)) 16 print(" [x] Sent ‘Hello World!‘",i) 17 connection.close()
rabbitmq_server_fanout. fanout 模式发送端

1 import pika,sys,time 2 connection = pika.BlockingConnection(pika.ConnectionParameters( 3 ‘localhost‘)) 4 channel =管道 5 6 #声明queue 广播模式不用声明队列 7 #channel.queue_declare(queue=‘hello‘)#队列名 hello 8 #channel.queue_declare(queue=‘hello‘,durable=True)#队列名 hello,持久化队列 9 10 argv=input(‘输入消息‘) 11 msg=‘‘.join(sys.argv[1:]) or ‘info:消息默认发送………‘ 12 for i in range(10): 13 time.sleep(1) 14 channel.basic_publish(exchange=‘logs‘,#绑定频道 15 #routing_key=‘hello‘, 16 routing_key=‘‘, 17 body=msg+str(i), 18 #properties=pika.BasicProperties(delivery_mode=2)#持久化 广播不能使用 19 ) 20 print(msg,i) 21 #connection.close()
rabbitmq_client_fanout fanout 接收端

1 #!/usr/bin/env python 2 #{data} {time} 3 4 #_*_coding:utf-8_*_ 5 6 import pika,time 7 8 connection = pika.BlockingConnection(pika.ConnectionParameters( 9 ‘localhost‘)) 10 channel = 11 #channel.queue_declare(queue=‘hello2‘)#服务端与客户端的设置需一致,不然会报错 12 #channel.queue_declare(queue=‘hello2‘,durable=True)#队列名 hello,持久化队列 13 channel.exchange_declare(exchange=‘logs‘,#绑定频道 14 type=‘fanout‘)#接收类型 15 reult=channel.queue_declare(exclusive=True)#随机生成唯一的队列名,会在消息接收后自动删除 16 queuename=reult.method.queue#队列名 自动生成 17 channel.queue_bind(exchange=‘logs‘,#先要绑定频道 18 queue=queuename 19 ) 20 21 22 def callback(ch, method, properties, body):#回调函数 23 print(‘接收消息中…………‘) 24 #time.sleep(5) 25 print(" [x] Received %r" % body.decode()) 26 ch.basic_ack(delivery_tag=method.delivery_tag) 27 28 29 channel.basic_qos(prefetch_count=1)#同时只处理一个消息 30 channel.basic_consume(callback,#接收到消息调用回调函数 callback 31 queue=queuename, 32 #no_ack=True 33 ) 34 35 print(‘ [*] 接收消息中. To exit press CTRL+C‘) 36 37 channel.start_consuming()#启动消息接收