redis,rabbitmq

Posted andypengx

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了redis,rabbitmq相关的知识,希望对你有一定的参考价值。

 

1、rabbitmq

 

安装python rabbitmq module

 

pip  install pika

 

  • 最简单的消息队列通信

 

生产者生产消息向hello队列发送消息,消费者从队列hello接受消息进行消费

 

producer:

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host=\'localhost\'))
channel = connection.channel()

channel.queue_declare(queue=\'hello\')

channel.basic_publish(exchange=\'\',
                      routing_key=\'hello\',
                      body=\'Hello World!\')
print(" [x] Sent \'Hello World!\'")
connection.close()

  

 

 consumer:

 

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host=\'localhost\'))
channel = connection.channel()

channel.queue_declare(queue=\'hello\')

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)

channel.basic_consume(callback,
                      queue=\'hello\',
                      no_ack=True)

print(\' [*] Waiting for messages. To exit press CTRL+C\')
channel.start_consuming()

  

 

Now we can try out our programs in a terminal. First, let\'s send a message using our send.pyprogram:

 $ python send.py
 [x] Sent \'Hello World!\'

The producer program send.py will stop after every run. Let\'s receive it:

 $ python receive.py
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received \'Hello World!\'



  • work queue

 

 在这种模式下,rabbitMQ会把p生产的消息分发给多个消费者,rabbitmq默认是轮询依次分发给每个消费者。

 

经常有如下需求:

When RabbitMQ quits or crashes it will forget the queues and messages unless you tell it not to. Two things are required to make sure that messages aren\'t lost: we need to mark both the queue and messages as durable.

也就是queue持久化和消息持久化。

 

queue持久化:

channel.queue_declare(queue=\'task_queue\', durable=True)   #声明队列持久化。但是rabbitmq不支持对已经存在的queue的参数进行重新定义;只能声明新的queue进行持久化

 

消息持久化:

 

 

channel.basic_publish(exchange=\'\',
                      routing_key="task_queue",
                      body=message,
                      properties=pika.BasicProperties(
                         delivery_mode = 2, # make message persistent
                      ))

 

 

 

消息确认:

Doing a task can take a few seconds. You may wonder what happens if one of the consumers starts a long task and dies with it only partly done. With our current code once RabbitMQ delivers message to the customer it immediately removes it from memory. In this case, if you kill a worker we will lose the message it was just processing. We\'ll also lose all the messages that were dispatched to this particular worker but were not yet handled.

But we don\'t want to lose any tasks. If a worker dies, we\'d like the task to be delivered to another worker.

 

 

Message acknowledgments are turned on by default. In previous examples we explicitly turned them off via the no_ack=True flag. It\'s time to remove this flag and send a proper acknowledgment from the worker, once we\'re done with a task.

def callback(ch, method, properties, body):
    print " [x] Received %r" % (body,)
    time.sleep( body.count(\'.\') )
    print " [x] Done"
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_consume(callback,
                      queue=\'hello\')

 

 

 

公平分发:

 

 

如果Rabbit只管按顺序把消息发到各个消费者身上,不考虑消费者负载的话,很可能出现,一个机器配置不高的消费者那里堆积了很多消息处理不完,同时配置高的消费者却一直很轻松。为解决此问题,可以在各个消费者端,配置perfetch=1,意思就是告诉RabbitMQ在我这个消费者当前消息还没处理完的时候就不要再给我发新消息了

This tells RabbitMQ not to give more than one message to a worker at a time. Or, in other words, don\'t dispatch a new message to a worker until it has processed and acknowledged the previous one. Instead, it will dispatch it to the next worker that is not still busy.

 

channel.basic_qos(prefetch_count=1)

 

 

带queue持久化、消息持久化、消息确认,公平分发这些特性的代码示例

 

生产者

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host=\'localhost\'))
channel = connection.channel()

channel.queue_declare(queue=\'task_queue\', durable=True)

message = \' \'.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange=\'\',
                      routing_key=\'task_queue\',
                      body=message,
                      properties=pika.BasicProperties(
                         delivery_mode = 2, # make message persistent
                      ))
print(" [x] Sent %r" % message)
connection.close()

 

 消费者

 

#!/usr/bin/env python
import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host=\'localhost\'))
channel = connection.channel()

channel.queue_declare(queue=\'task_queue\', durable=True)
print(\' [*] Waiting for messages. To exit press CTRL+C\')

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    time.sleep(body.count(b\'.\'))
    print(" [x] Done")
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
                      queue=\'task_queue\')

channel.start_consuming()

 

 

  • 发布与订阅:

 

The core idea in the messaging model in RabbitMQ is that the producer never sends any messages directly to a queue. Actually, quite often the producer doesn\'t even know if a message will be delivered to any queue at all.

Instead, the producer can only send messages to an exchange. An exchange is a very simple thing.

 

exchange 有多种类型,不同的类型使用不同的规则推送消息给队列。有如下几种类型:

 

fanout: 所有bind到此exchange的queue都可以接收消息
direct: 通过routingKey和exchange决定的哪个queue可以接收消息

topic: 所有符合routingKey(此时可以是一个表达式)的queue可以接收消息,比direct进行更细致的过滤!

 

 

1)fanout:

 

发布者

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host=\'localhost\'))
channel = connection.channel()

channel.exchange_declare(exchange=\'logs\',
                         type=\'fanout\')

message = \' \'.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange=\'logs\',
                      routing_key=\'\',
                      body=message)
print(" [x] Sent %r" % message)
connection.close()

 

订阅者

 

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host=\'localhost\'))
channel = connection.channel()

channel.exchange_declare(exchange=\'logs\',
                         type=\'fanout\')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

channel.queue_bind(exchange=\'logs\',
                   queue=queue_name)

print(\' [*] Waiting for logs. To exit press CTRL+C\')

def callback(ch, method, properties, body):
    print(" [x] %r" % body)

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()

 

 

2)  direct

发布者

import pika
import sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host=\'localhost\'))
channel = connection.channel()
 
channel.exchange_declare(exchange=\'direct_logs\',
                         type=\'direct\')
 
severity = sys.argv[1] if len(sys.argv) > 1 else \'info\'
message = \' \'.join(sys.argv[2:]) or \'Hello World!\'
channel.basic_publish(exchange=\'direct_logs\',
                      routing_key=severity,
                      body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()

 

订阅者:

import pika
import sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host=\'localhost\'))
channel = connection.channel()
 
channel.exchange_declare(exchange=\'direct_logs\',
                         type=\'direct\')
 
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
 
severities = sys.argv[1:]
if not severities:
    sys.stderr.write("Usage: %s [info] [warning] [error]\\n" % sys.argv[0])
    sys.exit(1)
 
for severity in severities:
    channel.queue_bind(exchange=\'direct_logs\',
                       queue=queue_name,
                       routing_key=severity)
 
print(\' [*] Waiting for logs. To exit press CTRL+C\')
 
def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))
 
channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)
 
channel.start_consuming()

 

 

3) topic

 

 

发布者:

 

import pika
import sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host=\'localhost\'))
channel = connection.channel()
 
channel.exchange_declare(exchange=\'topic_logs\',
                         type=\'topic\')
 
routing_key = sys.argv[1] if len(sys.argv) > 1 else \'anonymous.info\'
message = \' \'.join(sys.argv[2:]) or \'Hello World!\'
channel.basic_publish(exchange=\'topic_logs\',
                      routing_key=routing_key,
                      body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()

 

生产者:

 

import pika
import sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host=\'localhost\'))
channel = connection.channel()
 
channel.exchange_declare(exchange=\'topic_logs\',
                         type=\'topic\')
 
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
 
binding_keys = sys.argv[1:]
if not binding_keys:
    sys.stderr.write("Usage: %s [binding_key]...\\n" % sys.argv[0])
    sys.exit(1)
 
for binding_key in binding_keys:
    channel.queue_bind(exchange=\'topic_logs\',
                       queue=queue_name,
                       routing_key=binding_key)
 
print(\' [*] Waiting for logs. To exit press CTRL+C\')
 
def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))
 
channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)
 
channel.start_consuming()

 

开启一个发布进程发布消息:

python rabbitMQ_topic_pub.py  error.httpd httpd error msg

 

开启2个订阅进程绑定不通的routekey接受消息

python rabbitMQ_topic_sub.py   *. httpd *.mail

python rabbitMQ_topic_sub.py   *. httpd  error.mysql

 

观察效果

 

  • 实现rpc效果

 

rpc server

server 端除了接受消息进行处理外,还要返回处理的结果给client

 

import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host=\'localhost\'))
 
channel = connection.channel()
 
channel.queue_declare(queue=\'rpc_queue\')
 
def fib(n):
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        return fib(n-1) + fib(n-2)
 
def on_request(ch, method, props, body):
    n = int(body)
 
    print(" [.] fib(%s)" % n)
    response = fib(n)
 
    ch.basic_publish(exchange=\'\',
                     routing_key=props.reply_to,
                     properties=pika.BasicProperties(correlation_id = \\
                                                         props.correlation_id),
                     body=str(response))
    ch.basic_ack(delivery_tag = method.delivery_tag)
 
channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_request, queue=\'rpc_queue\')
 
print(" [x] Awaiting RPC requests")
channel.start_consuming()

 

rpc client

 

client 除了要发送消息给server端处理外,还要接受server段返回的结果

import pika
import uuid
 
class FibonacciRpcClient(object):
    def __init__(self):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(
                host=\'localhost\'))
 
        self.channel = self.connection.channel()
 
        result = self.channel.queue_declare(exclusive=True)
        self.callback_queue = result.method.queue
 
        self.channel.basic_consume(self.on_response, no_ack=True,
                                   queue=self.callback_queue)
 
    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:
            self.response = body
 
    def call(self, n):
        self.response = None
        self.corr_id = str(uuid.uuid4())
        self.channel.basic_publish(exchange=\'\',
                                   routing_key=\'rpc_queue\',
                                   properties=pika.BasicProperties(
                                         reply_to = self.callback_queue,
                                         correlation_id = self.corr_id,
                                         ),
                                   body=str(n))
        while self.response is None:
            self.connection.process_data_events()
        return int(self.response)
 
fibonacci_rpc = FibonacciRpcClient()
 
print(" [x] Requesting fib(30)")
response = fibonacci_rpc.call(30)
print(" [.] Got %r" % response)

 

以上是关于redis,rabbitmq的主要内容,如果未能解决你的问题,请参考以下文章

Python11 RabbitMQ Redis

Celery+RabbitMQ+Redis

21Python之rabbitMQ,redis

python RabbitMQ队列/redis

Python开发Part 13:RabbitMQ与Redis

redis,rabbitmq