自动化运维Python系列之消息队列RabbitMQ
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了自动化运维Python系列之消息队列RabbitMQ相关的知识,希望对你有一定的参考价值。
RabbitMQ
RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue )的开源实现。AMQP 的出现其实也是应了广大人民群众的需求,虽然在同步消息通讯的世界里有很多公开标准(如 COBAR的 IIOP ,或者是 SOAP 等),但是在异步消息处理中却不是这样,只有大企业有一些商业实现(如微软的 MSMQ ,IBM 的 Websphere MQ 等),因此,在 2006 年的 6 月,Cisco 、Redhat、iMatix 等联合制定了 AMQP 的公开标准。
MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。
MQ框架非常之多,比较流行的有RabbitMq、ActiveMq、ZeroMq、kafka。(saltsatck底层采用的就是ZeroMq)
1)吞吐量(TPS):ZeroMq最好、RabbitMq 次之, ActiveMq 最差
2)持久化:ZeroMq不支持、RabbitMq和ActiveMq都支持
3)可用性、可靠性:RabbitMq最好,ActiveMq次之,ZeroMq最差
4)高并发:从实现语言来看,RabbitMQ最高,原因是它的实现语言是天生具备高并发高可用的erlang语言
RabbitMQ安装
# 安装配置epel源 $ rpm -ivh http://dl.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm # 安装erlang $ yum -y install erlang # 安装RabbitMQ $ yum -y install rabbitmq-server # 启动 service rabbitmq-server start/stop
安装API
# pip install pika # or # easy_install pika # or # 源码 # https://pypi.python.org/pypi/pika
使用API操作RabbitMQ
生产者
import pika # connection 一个TCP的连接、 connection = pika.BlockingConnection(pika.ConnectionParameters(‘localhost‘)) # channel 是建立在TCP连接中的一个虚拟连接 channel = connection.channel() # 声明一个queue channel.queue_declare(queue=‘hello‘) # n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange. channel.basic_publish(exchange=‘‘, routing_key=‘hello‘, body=‘Hello World!‘) print(" [x] Sent ‘Hello World!‘") connection.close()
消费者
import pika connection = pika.BlockingConnection(pika.ConnectionParameters( ‘localhost‘)) # connection 一个TCP的连接、 channel 是建立在TCP连接中的一个虚拟连接 channel = connection.channel() # 再次声明原因是因为再包含众多队列的RabbitMQ里面 我们不确定此次使用的队列是否已经声明过 # 再次声明确保能够正常使用 channel.queue_declare(queue=‘hello‘) # ch 管道内存地址 # 回调函数 def callback(ch, method, properties, body): print("---->", ch, method, properties) print(" [x] Received %r" % body) # 开始消费消息 channel.basic_consume(callback, queue=‘hello‘, no_ack=True # 确认消息 ) print(‘ [*] Waiting for messages. To exit press CTRL+C‘) channel.start_consuming()
1)no-ack = False 消息不丢失
no-ack = False,如果消费者遇到情况(its channel is closed, connection is closed, or TCP connection is lost)挂掉了,那么,RabbitMQ会重新将该任务添加到队列中。
消费者
import pika connection = pika.BlockingConnection(pika.ConnectionParameters( host=‘10.0.0.111‘)) channel = connection.channel() channel.queue_declare(queue=‘hello‘) def callback(ch, method, properties, body): print(" [x] Received %r" % body) import time time.sleep(10) print ‘ok‘ ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_consume(callback, queue=‘hello‘, # 和服务端确认消息 确保消息不丢失 no_ack=False) print(‘ [*] Waiting for messages. To exit press CTRL+C‘) channel.start_consuming()
2)durable 消息持久化
生产者
#!/usr/bin/env python import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘10.0.0.111‘)) channel = connection.channel() # 消息持久化 channel.queue_declare(queue=‘hello‘, durable=True) channel.basic_publish(exchange=‘‘, routing_key=‘hello‘, body=‘Hello World!‘, # 消息持久化 properties=pika.BasicProperties( delivery_mode=2, )) print(" [x] Sent ‘Hello World!‘") connection.close()
消费者
#!/usr/bin/env python # -*- coding:utf-8 -*- import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘10.0.0.111‘)) channel = connection.channel() channel.queue_declare(queue=‘hello‘, durable=True) def callback(ch, method, properties, body): print(" [x] Received %r" % body) import time time.sleep(10) print ‘ok‘ ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_consume(callback, queue=‘hello‘, no_ack=False) print(‘ [*] Waiting for messages. To exit press CTRL+C‘) channel.start_consuming()
3)消息存取顺序
默认消息队列里的数据是按照顺序被消费者拿走,例如:消费者1 去队列中获取 奇数 序列的任务,消费者2去队列中获取 偶数 序列的任务。
#!/usr/bin/env python # -*- coding:utf-8 -*- import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘10.0.0.111‘)) channel = connection.channel() channel.queue_declare(queue=‘hello‘) def callback(ch, method, properties, body): print(" [x] Received %r" % body) import time time.sleep(10) print ‘ok‘ ch.basic_ack(delivery_tag = method.delivery_tag) # 谁来谁取 不按奇偶等顺序来取 channel.basic_qos(prefetch_count=1) channel.basic_consume(callback, queue=‘hello‘, no_ack=False) print(‘ [*] Waiting for messages. To exit press CTRL+C‘) channel.start_consuming()
4)发布订阅
Procuder Publish的Message进入了Exchange。接着通过“routing keys”, RabbitMQ会找到应该把这个Message放到哪个queue里。queue也是通过这个routing keys来做的绑定
有三种类型的Exchanges:direct, fanout,topic。 每个实现了不同的路由算法。
exchange type = fanout
发布者
#!/usr/bin/env python import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host=‘localhost‘)) channel = connection.channel() channel.exchange_declare(exchange=‘logs‘, type=‘fanout‘) message = ‘ ‘.join(sys.argv[1:]) or "info: Hello World!" channel.basic_publish(exchange=‘logs‘, routing_key=‘‘, body=message) print(" [x] Sent %r" % message) connection.close()
订阅者
#!/usr/bin/env python import pika connection = pika.BlockingConnection(pika.ConnectionParameters( host=‘localhost‘)) channel = connection.channel() channel.exchange_declare(exchange=‘logs‘, type=‘fanout‘) result = channel.queue_declare(exclusive=True) queue_name = result.method.queue channel.queue_bind(exchange=‘logs‘, queue=queue_name) print(‘ [*] Waiting for logs. To exit press CTRL+C‘) def callback(ch, method, properties, body): print(" [x] %r" % body) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
5)关键字发送
exchange type = direct
生产者
import pika connection = pika.BlockingConnection(pika.ConnectionParameters( host=‘127.0.0.1‘)) channel = connection.channel() channel.exchange_declare(exchange=‘direct_logs_test_1‘, type=‘direct‘) severity = ‘error‘ message = ‘123‘ # severity = sys.argv[1] if len(sys.argv) > 1 else ‘info‘ # message = ‘ ‘.join(sys.argv[2:]) or ‘Hello World!‘ channel.basic_publish(exchange=‘direct_logs_test_1‘, routing_key=severity, body=message) print(" [x] Sent %r:%r" % (severity, message)) connection.close()
消费者
import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host=‘127.0.0.1‘)) channel = connection.channel() channel.exchange_declare(exchange=‘direct_logs_test_1‘, type=‘direct‘) result = channel.queue_declare(exclusive=True) queue_name = result.method.queue # severities = sys.argv[1:] # if not severities: # sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0]) # sys.exit(1) severities = [‘error‘] for severity in severities: channel.queue_bind(exchange=‘direct_logs_test_1‘, queue=queue_name, routing_key=severity) print(‘ [*] Waiting for logs. To exit press CTRL+C‘) def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
7)模糊匹配
exchange type = topic
# 表示可以匹配 0 个 或 多个 单词 * 表示只能匹配 一个 单词
消费者
#!/usr/bin/env python import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host=‘localhost‘)) channel = connection.channel() channel.exchange_declare(exchange=‘topic_logs‘, type=‘topic‘) result = channel.queue_declare(exclusive=True) queue_name = result.method.queue binding_keys = sys.argv[1:] if not binding_keys: sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0]) sys.exit(1) for binding_key in binding_keys: channel.queue_bind(exchange=‘topic_logs‘, queue=queue_name, routing_key=binding_key) print(‘ [*] Waiting for logs. To exit press CTRL+C‘) def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
生产者
#!/usr/bin/env python import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host=‘localhost‘)) channel = connection.channel() channel.exchange_declare(exchange=‘topic_logs‘, type=‘topic‘) routing_key = sys.argv[1] if len(sys.argv) > 1 else ‘anonymous.info‘ message = ‘ ‘.join(sys.argv[2:]) or ‘Hello World!‘ channel.basic_publish(exchange=‘topic_logs‘, routing_key=routing_key, body=message) print(" [x] Sent %r:%r" % (routing_key, message)) connection.close()
本文出自 “改变从每一天开始” 博客,请务必保留此出处http://lilongzi.blog.51cto.com/5519072/1886707
以上是关于自动化运维Python系列之消息队列RabbitMQ的主要内容,如果未能解决你的问题,请参考以下文章