redis,rabbitmq
Posted andypengx
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了redis,rabbitmq相关的知识,希望对你有一定的参考价值。
1、rabbitmq
安装python rabbitmq module
pip install pika
- 最简单的消息队列通信
生产者生产消息向hello队列发送消息,消费者从队列hello接受消息进行消费
producer:
#!/usr/bin/env python import pika connection = pika.BlockingConnection(pika.ConnectionParameters( host=\'localhost\')) channel = connection.channel() channel.queue_declare(queue=\'hello\') channel.basic_publish(exchange=\'\', routing_key=\'hello\', body=\'Hello World!\') print(" [x] Sent \'Hello World!\'") connection.close()
#!/usr/bin/env python import pika connection = pika.BlockingConnection(pika.ConnectionParameters( host=\'localhost\')) channel = connection.channel() channel.queue_declare(queue=\'hello\') def callback(ch, method, properties, body): print(" [x] Received %r" % body) channel.basic_consume(callback, queue=\'hello\', no_ack=True) print(\' [*] Waiting for messages. To exit press CTRL+C\') channel.start_consuming()
Now we can try out our programs in a terminal. First, let\'s send a message using our send.pyprogram:
$ python send.py
[x] Sent \'Hello World!\'
The producer program send.py will stop after every run. Let\'s receive it:
$ python receive.py
[*] Waiting for messages. To exit press CTRL+C
[x] Received \'Hello World!\'
- work queue
在这种模式下,rabbitMQ会把p生产的消息分发给多个消费者,rabbitmq默认是轮询依次分发给每个消费者。
经常有如下需求:
When RabbitMQ quits or crashes it will forget the queues and messages unless you tell it not to. Two things are required to make sure that messages aren\'t lost: we need to mark both the queue and messages as durable.
也就是queue持久化和消息持久化。
queue持久化:
channel.queue_declare(queue=\'task_queue\', durable=True) #声明队列持久化。但是rabbitmq不支持对已经存在的queue的参数进行重新定义;只能声明新的queue进行持久化
消息持久化:
channel.basic_publish(exchange=\'\', routing_key="task_queue", body=message, properties=pika.BasicProperties( delivery_mode = 2, # make message persistent ))
消息确认:
Doing a task can take a few seconds. You may wonder what happens if one of the consumers starts a long task and dies with it only partly done. With our current code once RabbitMQ delivers message to the customer it immediately removes it from memory. In this case, if you kill a worker we will lose the message it was just processing. We\'ll also lose all the messages that were dispatched to this particular worker but were not yet handled.
But we don\'t want to lose any tasks. If a worker dies, we\'d like the task to be delivered to another worker.
Message acknowledgments are turned on by default. In previous examples we explicitly turned them off via the no_ack=True flag. It\'s time to remove this flag and send a proper acknowledgment from the worker, once we\'re done with a task.
def callback(ch, method, properties, body): print " [x] Received %r" % (body,) time.sleep( body.count(\'.\') ) print " [x] Done" ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_consume(callback, queue=\'hello\')
公平分发:
如果Rabbit只管按顺序把消息发到各个消费者身上,不考虑消费者负载的话,很可能出现,一个机器配置不高的消费者那里堆积了很多消息处理不完,同时配置高的消费者却一直很轻松。为解决此问题,可以在各个消费者端,配置perfetch=1,意思就是告诉RabbitMQ在我这个消费者当前消息还没处理完的时候就不要再给我发新消息了
This tells RabbitMQ not to give more than one message to a worker at a time. Or, in other words, don\'t dispatch a new message to a worker until it has processed and acknowledged the previous one. Instead, it will dispatch it to the next worker that is not still busy.
channel.basic_qos(prefetch_count=1)
带queue持久化、消息持久化、消息确认,公平分发这些特性的代码示例
生产者
#!/usr/bin/env python import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host=\'localhost\')) channel = connection.channel() channel.queue_declare(queue=\'task_queue\', durable=True) message = \' \'.join(sys.argv[1:]) or "Hello World!" channel.basic_publish(exchange=\'\', routing_key=\'task_queue\', body=message, properties=pika.BasicProperties( delivery_mode = 2, # make message persistent )) print(" [x] Sent %r" % message) connection.close()
消费者
#!/usr/bin/env python import pika import time connection = pika.BlockingConnection(pika.ConnectionParameters( host=\'localhost\')) channel = connection.channel() channel.queue_declare(queue=\'task_queue\', durable=True) print(\' [*] Waiting for messages. To exit press CTRL+C\') def callback(ch, method, properties, body): print(" [x] Received %r" % body) time.sleep(body.count(b\'.\')) print(" [x] Done") ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(callback, queue=\'task_queue\') channel.start_consuming()
- 发布与订阅:
The core idea in the messaging model in RabbitMQ is that the producer never sends any messages directly to a queue. Actually, quite often the producer doesn\'t even know if a message will be delivered to any queue at all.
Instead, the producer can only send messages to an exchange. An exchange is a very simple thing.
exchange 有多种类型,不同的类型使用不同的规则推送消息给队列。有如下几种类型:
fanout: 所有bind到此exchange的queue都可以接收消息
direct: 通过routingKey和exchange决定的哪个queue可以接收消息
topic: 所有符合routingKey(此时可以是一个表达式)的queue可以接收消息,比direct进行更细致的过滤!
1)fanout:
发布者
#!/usr/bin/env python import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host=\'localhost\')) channel = connection.channel() channel.exchange_declare(exchange=\'logs\', type=\'fanout\') message = \' \'.join(sys.argv[1:]) or "info: Hello World!" channel.basic_publish(exchange=\'logs\', routing_key=\'\', body=message) print(" [x] Sent %r" % message) connection.close()
订阅者
#!/usr/bin/env python import pika connection = pika.BlockingConnection(pika.ConnectionParameters( host=\'localhost\')) channel = connection.channel() channel.exchange_declare(exchange=\'logs\', type=\'fanout\') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue channel.queue_bind(exchange=\'logs\', queue=queue_name) print(\' [*] Waiting for logs. To exit press CTRL+C\') def callback(ch, method, properties, body): print(" [x] %r" % body) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
2) direct
发布者
import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host=\'localhost\')) channel = connection.channel() channel.exchange_declare(exchange=\'direct_logs\', type=\'direct\') severity = sys.argv[1] if len(sys.argv) > 1 else \'info\' message = \' \'.join(sys.argv[2:]) or \'Hello World!\' channel.basic_publish(exchange=\'direct_logs\', routing_key=severity, body=message) print(" [x] Sent %r:%r" % (severity, message)) connection.close()
订阅者:
import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host=\'localhost\')) channel = connection.channel() channel.exchange_declare(exchange=\'direct_logs\', type=\'direct\') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue severities = sys.argv[1:] if not severities: sys.stderr.write("Usage: %s [info] [warning] [error]\\n" % sys.argv[0]) sys.exit(1) for severity in severities: channel.queue_bind(exchange=\'direct_logs\', queue=queue_name, routing_key=severity) print(\' [*] Waiting for logs. To exit press CTRL+C\') def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
3) topic
发布者:
import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host=\'localhost\')) channel = connection.channel() channel.exchange_declare(exchange=\'topic_logs\', type=\'topic\') routing_key = sys.argv[1] if len(sys.argv) > 1 else \'anonymous.info\' message = \' \'.join(sys.argv[2:]) or \'Hello World!\' channel.basic_publish(exchange=\'topic_logs\', routing_key=routing_key, body=message) print(" [x] Sent %r:%r" % (routing_key, message)) connection.close()
生产者:
import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host=\'localhost\')) channel = connection.channel() channel.exchange_declare(exchange=\'topic_logs\', type=\'topic\') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue binding_keys = sys.argv[1:] if not binding_keys: sys.stderr.write("Usage: %s [binding_key]...\\n" % sys.argv[0]) sys.exit(1) for binding_key in binding_keys: channel.queue_bind(exchange=\'topic_logs\', queue=queue_name, routing_key=binding_key) print(\' [*] Waiting for logs. To exit press CTRL+C\') def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
开启一个发布进程发布消息:
python rabbitMQ_topic_pub.py error.httpd httpd error msg
开启2个订阅进程绑定不通的routekey接受消息
python rabbitMQ_topic_sub.py *. httpd *.mail
python rabbitMQ_topic_sub.py *. httpd error.mysql
观察效果
- 实现rpc效果
rpc server
server 端除了接受消息进行处理外,还要返回处理的结果给client
import pika import time connection = pika.BlockingConnection(pika.ConnectionParameters( host=\'localhost\')) channel = connection.channel() channel.queue_declare(queue=\'rpc_queue\') def fib(n): if n == 0: return 0 elif n == 1: return 1 else: return fib(n-1) + fib(n-2) def on_request(ch, method, props, body): n = int(body) print(" [.] fib(%s)" % n) response = fib(n) ch.basic_publish(exchange=\'\', routing_key=props.reply_to, properties=pika.BasicProperties(correlation_id = \\ props.correlation_id), body=str(response)) ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(on_request, queue=\'rpc_queue\') print(" [x] Awaiting RPC requests") channel.start_consuming()
rpc client
client 除了要发送消息给server端处理外,还要接受server段返回的结果
import pika import uuid class FibonacciRpcClient(object): def __init__(self): self.connection = pika.BlockingConnection(pika.ConnectionParameters( host=\'localhost\')) self.channel = self.connection.channel() result = self.channel.queue_declare(exclusive=True) self.callback_queue = result.method.queue self.channel.basic_consume(self.on_response, no_ack=True, queue=self.callback_queue) def on_response(self, ch, method, props, body): if self.corr_id == props.correlation_id: self.response = body def call(self, n): self.response = None self.corr_id = str(uuid.uuid4()) self.channel.basic_publish(exchange=\'\', routing_key=\'rpc_queue\', properties=pika.BasicProperties( reply_to = self.callback_queue, correlation_id = self.corr_id, ), body=str(n)) while self.response is None: self.connection.process_data_events() return int(self.response) fibonacci_rpc = FibonacciRpcClient() print(" [x] Requesting fib(30)") response = fibonacci_rpc.call(30) print(" [.] Got %r" % response)
以上是关于redis,rabbitmq的主要内容,如果未能解决你的问题,请参考以下文章