python运维开发之第十一天(RabbitMQ,redis)

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python运维开发之第十一天(RabbitMQ,redis)相关的知识,希望对你有一定的参考价值。

一、RabbitMQ

python的Queue与RabbitMQ之间的理解:

python的进程或线程Queue只能python自己用。RabbitMQ队列多个应用之间共享队列,互相通信。

1、简单的实现生产者与消费者

  生产者

  (1)建立socket连接;(2)声明一个管道;(3)声明队列(queue);(4)通过管道发消息;(5)routing_key(queue名字);(6)body(内容)

  消费者

  (1)建立连接;(2)声明管道;(3)声明队列;(4)消费者声明队列(防止生产者后启动,消费者报错);(5)消费消息;(6)callback如果收到消息就调用函数处理消息 queue队列名字;

技术分享
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author  : Willpower-chen
# @blog: http://www.cnblogs.com/willpower-chen/

import pika
#建立socket连接
connection = pika.BlockingConnection(pika.ConnectionParameters(localhost))
#声明一个管道
channel = connection.channel()
#声明一个队列
channel.queue_declare(queue=hello)
#通过管道发消息,routing_key 队列queue名字 ,body发送内容
channel.basic_publish(exchange=‘‘,
                      routing_key=hello,
                      body=Hello World! 1 2)
print("[x] send ‘Hello World! 1 2 ‘")
connection.close()
producer
技术分享
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author  : Willpower-chen
# @blog: http://www.cnblogs.com/willpower-chen/

import pika,time
#建立连接
connection = pika.BlockingConnection(pika.ConnectionParameters(localhost))
#声明一个管道
channel = connection.channel()
#声明队列,防止生产者(发送端)没开启,消费者端报错
channel.queue_declare(queue=hello)
#ch管道的内存对象地址,如果收到消息就调用函数callback,处理消息
def callbak(ch,method,properties,body):
    print("[x] Received %r " % body)
    # time.sleep(30)
#消费消息
channel.basic_consume(callbak,
                      queue=hello,
                      no_ack=True #消息有没处理,都不给生产者发确认消息
                      )
print([*] Waitting for messages TO exit press ctrl+c)
channel.start_consuming() #开始
consumer

 2、消费者对生产者,可以1对多,而且默认是轮询机制

no_ack=True如果注释掉的话,消费者端不给服务器端确认收到消息,服务器端就不会把要发的消息从队列里清除

如下图注释了no_ack,加了一个时间,

     技术分享

开启三个消费者,一个生产者,生产者只send一次数据,挨个停止consumer,会发现同一条消息会被重新发给下一个consumer,直到producer收到consumer的确认收到的消息

技术分享

 

3、队列查询

技术分享

清除队列消息

技术分享

 

4、消息持久化

(1)durable只是队列持久化

channel.queue_declare(queue=‘hello‘,durable=True)

生产者和消费者都需要添加durable=True

(2)要实现消息持久化,还需要

技术分享

5、消息(1对多)实现权重功能

消费者端添加在消费消息之前

channel.basic_qos(prefetch_count=1)

 

6、广播消息fanout(纯广播)订阅发布

技术分享
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author  : Willpower-chen
# @blog: http://www.cnblogs.com/willpower-chen/

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host=localhost))
channel = connection.channel()

channel.exchange_declare(exchange=logs,
                         type=fanout)
#message = ‘ ‘.join(sys.argv[1:]) or "info: Hello World!"
message = "info: Hello World!2"

channel.basic_publish(exchange=logs,
                      routing_key=‘‘,
                      body=message)
print(" [x] Sent %r" % message)

connection.close()
fanout_producer
技术分享
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author  : Willpower-chen
# @blog: http://www.cnblogs.com/willpower-chen/
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host=localhost))
channel = connection.channel()

channel.exchange_declare(exchange=logs,
                         type=fanout)
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
print("random queuename",queue_name)

channel.queue_bind(exchange=logs,
                   queue=queue_name)

print( [*] Waiting for logs. To exit press CTRL+C)


def callback(ch, method, properties, body):
    print(" [x] %r" % body)


channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()
fanout_consumer

7、direct广播模式(有选择性的发送接收消息)

技术分享
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host=localhost))
channel = connection.channel()

channel.exchange_declare(exchange=direct_logs,
                         type=direct)

severity = sys.argv[1] if len(sys.argv) > 1 else info
message =  .join(sys.argv[2:]) or Hello World!
channel.basic_publish(exchange=direct_logs,
                      routing_key=severity,
                      body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()
direct_producer
技术分享
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author  : Willpower-chen
# @blog: http://www.cnblogs.com/willpower-chen/
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host=localhost))
channel = connection.channel()

channel.exchange_declare(exchange=direct_logs,
                         type=direct)

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

severities = sys.argv[1:]
if not severities:
    sys.stderr.write("Usage: %s [info] [warning] [error]\\n" % sys.argv[0])
    sys.exit(1)

for severity in severities:
    channel.queue_bind(exchange=direct_logs,
                       queue=queue_name,
                       routing_key=severity)

print(severities)
print( [*] Waiting for logs. To exit press CTRL+C)


def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))


channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()
direct_consumer

技术分享

8、更细致的消息判断 type = topic

技术分享
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author  : Willpower-chen
# @blog: http://www.cnblogs.com/willpower-chen/

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host=localhost))
channel = connection.channel()

channel.exchange_declare(exchange=topic_logs,
                         type=topic)

routing_key = sys.argv[1] if len(sys.argv) > 1 else anonymous.info
message =  .join(sys.argv[2:]) or Hello World!
channel.basic_publish(exchange=topic_logs,
                      routing_key=routing_key,
                      body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()
topic_producer
技术分享
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author  : Willpower-chen
# @blog: http://www.cnblogs.com/willpower-chen/

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host=localhost))
channel = connection.channel()

channel.exchange_declare(exchange=topic_logs,
                         type=topic)

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

binding_keys = sys.argv[1:]
if not binding_keys:
    sys.stderr.write("Usage: %s [binding_key]...\\n" % sys.argv[0])
    sys.exit(1)

for binding_key in binding_keys:
    channel.queue_bind(exchange=topic_logs,
                       queue=queue_name,
                       routing_key=binding_key)

print( [*] Waiting for logs. To exit press CTRL+C)


def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))


channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()
topic_consumer

技术分享

 

 

 

 

 


 
 

以上是关于python运维开发之第十一天(RabbitMQ,redis)的主要内容,如果未能解决你的问题,请参考以下文章

python运维开发之第五天

python运维开发之第六天

python运维开发之第七天

python运维开发之第八天(socket)

Python之第二十一天的努力--模块1

Python开发十一章:RabbitMQ队列