RabbitMQ
Posted Lani
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ相关的知识,希望对你有一定的参考价值。
---恢复内容开始---
RabbitMQ队列
安装 http://www.rabbitmq.com/install-standalone-mac.html
安装python rabbitMQ module
1 pip install pika 2 or 3 easy_install pika 4 or 5 源码 6 7 https://pypi.python.org/pypi/pika
实现最简单的队列通信
1 # 添加用户 2 rabbitmqctl add_user name pass 3 # 分配角色 4 rabbitmqctl set_user_tags name administrator 5 6 # 授权网段 7 rabbitmqctl set_permissions -p / name ".*" ".*" ".*"
send端
1 #! /usr/bin/env python 2 # encoding: utf-8 3 4 import pika 5 6 # 连接Rabbit 7 credentials = pika.PlainCredentials("laiying", "laiying123") 8 connection = pika.BlockingConnection(pika.ConnectionParameters("192.168.14.4", 9 credentials=credentials)) 10 channel = connection.channel() # 创建rabbitmq协议通道 11 12 # 生命queue 13 channel.queue_declare(queue="hello") 14 15 channel.basic_publish(exchange=\'\', 16 routing_key="hello", 17 body="hello world") 18 print("send hello world") 19 connection.close()
receive端
1 #! /usr/bin/env python 2 # encoding: utf-8 3 4 import pika 5 import time 6 7 credentials = pika.PlainCredentials("laiying", "laiying123") 8 connection = pika.BlockingConnection(pika.ConnectionParameters("192.168.14.4", 9 credentials=credentials)) 10 channel = connection.channel() # 创建rabbitmq协议通道 11 12 channel.queue_declare(queue="hello") 13 14 15 def callback(ch, method, properties, body): 16 # print(\'12--\', ch) 17 # print(method) 18 # print(properties) 19 print("消费者1 %s" % body) 20 time.sleep(20) 21 print("消费者接受%s" % body) 22 23 24 channel.basic_consume(callback, 25 queue="hello", 26 no_ack=True) 27 28 print("rabbitMQ 链接") 29 30 channel.start_consuming()
Work Queues
在这种模式下,RabbitMQ会默认把p发的消息依次分发给各个消费者(c),跟负载均衡差不多
消息提供者代码
1 import pika 2 import time 3 4 credentials = pika.PlainCredentials(\'alex\', \'alex123\') 5 connection = pika.BlockingConnection(pika.ConnectionParameters( 6 \'192.168.14.52\',credentials=credentials)) 7 channel = connection.channel() 8 9 # 声明queue 10 channel.queue_declare(queue=\'task_queue\',durable=True) 11 12 # n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange. 13 import sys 14 15 message = \' \'.join(sys.argv[1:]) or "Hello World! %s" % time.time() 16 17 channel.basic_publish(exchange=\'\', 18 routing_key=\'task_queue\', 19 body=message, 20 properties=pika.BasicProperties( 21 delivery_mode=2, # make message persistent 22 ) 23 24 ) 25 print(" [x] Sent %r" % message) 26 connection.close()
消费者代码
1 import pika, time 2 3 credentials = pika.PlainCredentials(\'alex\', \'alex123\') 4 connection = pika.BlockingConnection(pika.ConnectionParameters( 5 \'192.168.14.52\',credentials=credentials)) 6 channel = connection.channel() 7 8 9 def callback(ch, method, properties, body): 10 print(" [x] Received %r" % body) 11 time.sleep(20) 12 print(" [x] Done") 13 print("method.delivery_tag", method.delivery_tag) 14 ch.basic_ack(delivery_tag=method.delivery_tag) 15 #ackownledgement 16 17 18 channel.basic_consume(callback, 19 queue=\'task_queue\', 20 #no_ack=True 21 ) 22 23 print(\' [*] Waiting for messages. To exit press CTRL+C\') 24 channel.start_consuming()
消息持久化
1 # 队列持久化 2 channel.queue_declare(queue=\'hello\', durable=True) 3 #消息持久化 4 channel.basic_publish(exchange=\'\', 5 routing_key="task_queue", 6 body=message, 7 properties=pika.BasicProperties( 8 delivery_mode = 2, # make message persistent 9 ))
注意事项
# on_ack = True, 默认为false,该值为false 后,会自动回收消费者没有执行完成的消息,继续发给小一个队列 # ch.basic_ack(delivery_tag=method.delivery_tag) 消费者处理完成队列后返回的队列标识符
消息公平分发
如果Rabbit只管按顺序把消息发到各个消费者身上,不考虑消费者负载的话,很可能出现,一个机器配置不高的消费者那里堆积了很多消息处理不完,同时配置高的消费者却一直很轻松。为解决此问题,可以在各个消费者端,配置perfetch=1,意思就是告诉RabbitMQ在我这个消费者当前消息还没处理完的时候就不要再给我发新消息了。
1 channel.basic_qos(prefetch_count=1)
带消息持久化+公平分发的完整代码
生产者端
1 import pika 2 import time 3 4 credentials = pika.PlainCredentials(\'alex\', \'alex123\') 5 connection = pika.BlockingConnection(pika.ConnectionParameters( 6 \'192.168.14.52\',credentials=credentials)) 7 channel = connection.channel() 8 9 # 声明queue 10 channel.queue_declare(queue=\'task_queue\',durable=True) 11 12 # n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange. 13 import sys 14 15 message = \' \'.join(sys.argv[1:]) or "Hello World! %s" % time.time() 16 17 channel.basic_publish(exchange=\'\', 18 routing_key=\'task_queue\', 19 body=message, 20 properties=pika.BasicProperties( 21 delivery_mode=2, # make message persistent 22 ) 23 24 ) 25 print(" [x] Sent %r" % message) 26 connection.close()
消费者代码
1 #!/usr/bin/env python 2 import pika 3 import time 4 5 credentials = pika.PlainCredentials(\'alex\', \'alex123\') 6 connection = pika.BlockingConnection(pika.ConnectionParameters( 7 \'192.168.14.52\',credentials=credentials)) 8 channel = connection.channel() 9 10 channel.queue_declare(queue=\'task_queue\', durable=True) 11 print(\' [*] Waiting for messages. To exit press CTRL+C\') 12 13 def callback(ch, method, properties, body): 14 print(" [x] Received %r" % body) 15 time.sleep(body.count(b\'.\')) 16 print(" [x] Done") 17 ch.basic_ack(delivery_tag = method.delivery_tag) 18 19 channel.basic_qos(prefetch_count=1) 20 channel.basic_consume(callback, 21 queue=\'task_queue\') 22 23 channel.start_consuming()
Publish\\Subscribe(消息发布\\订阅)
之前的例子都基本都是1对1的消息发送和接收,即消息只能发送到指定的queue里,但有些时候你想让你的消息被所有的Queue收到,类似广播的效果,这时候就要用到exchange了,
Exchange在定义的时候是有类型的,以决定到底是哪些Queue符合条件,可以接收消息
fanout: 所有bind到此exchange的queue都可以接收消息
direct: 通过routingKey和exchange决定的那个唯一的queue可以接收消息
topic:所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息
表达式符号说明:#代表一个或多个字符,*代表任何字符
例:#.a会匹配a.a,aa.a,aaa.a等
*.a会匹配a.a,b.a,c.a等
注:使用RoutingKey为#,Exchange Type为topic的时候相当于使用fanout
headers: 通过headers 来决定把消息发给哪些queue
生成者代码 fanout模式
1 #! /usr/bin/env python 2 # encoding: utf-8 3 4 import pika 5 import sys 6 credentials = pika.PlainCredentials("laiying", "laiying123") 7 connection = pika.BlockingConnection(pika.ConnectionParameters("127.0.0.1", 8 credentials=credentials)) 9 channel = connection.channel() 10 11 channel.exchange_declare(exchange="logs", 12 type="fanout") 13 14 message = \' \'.join(sys.argv[1:]) or "info: Hello World!" 15 channel.basic_publish(exchange=\'logs\', 16 routing_key=\'\', 17 body=message) 18 print("Send %s" % message)
消费者fanout模式
1 #! /usr/bin/env python 2 # encoding: utf-8 3 4 import pika 5 6 credentials = pika.PlainCredentials("laiying", "laiying123") 7 connection = pika.BlockingConnection(pika.ConnectionParameters("127.0.0.1", 8 credentials=credentials)) 9 channel = connection.channel() # 创建rabbitmq协议通道 10 11 channel.exchange_declare(exchange="logs", 12 type="fanout") 13 # 不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除 14 result = channel.queue_declare(exclusive=True) 15 queue_name = result.method.queue 16 17 channel.queue_bind(exchange="logs", 18 queue=queue_name) 19 20 print("消费者。。。。") 21 22 def callback(ch, method, properties, body): 23 print("body %s" % body) 24 25 channel.basic_consume(callback, 26 queue=queue_name, 27 no_ack=True) 28 29 channel.start_consuming()
以上是关于RabbitMQ的主要内容,如果未能解决你的问题,请参考以下文章
带着新人学springboot的应用07(springboot+RabbitMQ 下)