基于py-amqp编写生产者和消费者代码
Posted gj4990
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了基于py-amqp编写生产者和消费者代码相关的知识,希望对你有一定的参考价值。
在介绍erlang的dbg调试RabbitMQ源码之前,首先介绍基于py-amqp编写RabbitMQ的生产者和消费者代码,其中py-amqp的安装包可在以下链接下载:https://pypi.org/project/amqp/
1 公共模块
生产者和消费者一些公共代码编写在utils.py文件中,代码如下:
import logging
import json
import os
class LOG(object):
"""
LOG object is used to print logging informations.
"""
def __init__(self, name=__name__):
self.logger = logging.getLogger(name)
self.logger.setLevel(level=logging.INFO)
handler = logging.StreamHandler()
handler.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s %(thread)d %(levelname)s %(name)s %(message)s')
handler.setFormatter(formatter)
self.logger.addHandler(handler)
def info(self, msg):
self.logger.info(msg)
def debug(self, msg):
self.logger.debug(msg)
def warning(self, msg):
self.logger.warning(msg)
def error(self, msg):
self.logger.error(msg)
def get_record_file():
for item in os.listdir(os.getcwd()):
if item.endswith('.json'):
return os.path.join(os.getcwd(),item)
def read_record():
file = get_record_file()
with open(file, 'r') as f:
record = json.load(f)
return record
其中的RabbitMQ的实体(如queue和exchange)以及发送消息编写在record.json文件中,即:
"context":
"version": "1.0",
"body": "RabbitMQ transport message"
,
"entities":
"exchange": "base_exchange",
"exchange_type": "topic",
"queue": "base_queue",
"binding_key": "base.#",
"routing_key": "base.test"
2 生产者模块
生产者代码如下:
import sys
import amqp
import time
import utils
import traceback
import json
LOG = utils.LOG(__name__)
_ts = lambda: time.time()
def seed_msg(count, context):
if count <= 0:
LOG.error("Please specify a value greater than 0.")
raise ValueError
msg = amqp.Message(json.dumps(context))
msg.properties['delivery_mode'] = 2
for _ in range(count):
yield msg
class Producer(object):
def __init__(self, url="localhost:5672", name="guest",
password="guest"):
self.url = url
self.name = name
self.password = password
def __enter__(self):
self.connect()
return self
def __exit__(self, *eargs):
self.close()
def connect(self):
LOG.info("Connect to RabbitMQ server.")
try:
self.conn = amqp.Connection(host=self.url,
userid=self.name,
password=self.password,
connect_timeout=5)
self.chan = self.conn.channel()
except Exception as exc:
LOG.error("Connect host: 0 failed and raise exception: 1".format(
self.url, exc))
raise exc
return self
def close(self):
LOG.info("Close connection with RabbitMQ server.")
if hasattr(self, 'chan'):
self.chan.close()
if hasattr(self, 'conn'):
self.conn.close()
def declare(self, exchange, exchange_type, queue,
binding_key, routing_key):
self.exchange = exchange
self.exchange_type = exchange_type
self.queue = queue
self.binding_key = binding_key
self.routing_key = routing_key
try:
self.chan.exchange_declare(exchange=self.exchange,
type=self.exchange_type,
durable=False,
auto_delete=False)
self.chan.queue_declare(queue=self.queue,
exclusive=False,
durable=False,
auto_delete=False)
self.chan.queue_bind(queue=self.queue,
exchange=self.exchange,
routing_key=self.binding_key)
except Exception as exc:
LOG.error("Declare failed and raise exception: 0".format(exc))
raise exc
def publish_message(self, msg):
try:
self.chan.basic_publish_confirm(msg, exchange=self.exchange,
routing_key=self.routing_key)
except Exception as exc:
LOG.error("Publish message failed and raise exception: 0".format(exc))
raise exc
def parse_record():
record = utils.read_record()
context = record.get('context')
entities = record.get('entities')
return context, entities
def main(argv):
url = raw_input("Please input a URL (<ip>:5672): ")
name, password = "guest", "guest"
interval = 2
context, entities = parse_record()
with Producer(url, name, password) as p:
p.declare(exchange=entities['exchange'],
exchange_type=entities['exchange_type'],
queue=entities['queue'],
binding_key=entities['binding_key'],
routing_key=entities['routing_key'])
for i, msg in enumerate(seed_msg(2, context), 1):
start_time = _ts()
LOG.info("Start to send 0 msg.".format(i))
p.publish_message(msg)
LOG.info("Finish to send 0 msg.".format(i))
end_time = _ts()
delay = end_time - start_time - interval
time.sleep(-delay if delay < 0 else 0)
if __name__ == "__main__":
main(sys.argv)
3 消费者模块
消费者代码如下:
import sys
import amqp
import time
import utils
import traceback
import socket
import signal
LOG = utils.LOG(__name__)
class Consumer(object):
def __init__(self, url="localhost:5672", name="guest",
password="guest"):
self.url = url
self.name = name
self.password = password
def __enter__(self):
self.connect()
return self
def __exit__(self, *eargs):
self.close()
def connect(self):
LOG.info("Connect to RabbitMQ server.")
try:
self.conn = amqp.Connection(host=self.url,
userid=self.name,
password=self.password,
connect_timeout=5)
self.chan = self.conn.channel()
except Exception as exc:
LOG.error("Connect host: 0 failed and "
"raise exception: 1".format(self.url, exc))
raise exc
return self
def close(self):
LOG.info("Close connection with RabbitMQ server.")
if hasattr(self, 'chan'):
self.chan.close()
if hasattr(self, 'conn'):
self.conn.close()
def _callback(self, msg):
LOG.info("Received msg: 0 properties: 1 delivery: 2".format(
msg.body, msg.properties, msg.delivery_info))
delivery_tag = msg.delivery_tag
self.chan.basic_ack(delivery_tag)
def declare(self, exchange, exchange_type, queue,
binding_key, routing_key):
self.exchange = exchange
self.exchange_type = exchange_type
self.queue = queue
self.binding_key = binding_key
self.routing_key = routing_key
try:
self.chan.exchange_declare(exchange=self.exchange,
type=self.exchange_type,
durable=False,
auto_delete=False)
self.chan.queue_declare(queue=self.queue,
exclusive=False,
durable=False,
auto_delete=False)
self.chan.queue_bind(queue=self.queue,
exchange=self.exchange,
routing_key=self.binding_key)
self.chan.basic_consume(queue=self.queue,
no_ack=False,
callback=self._callback,
consumer_tag="tag_0".format(queue))
except Exception as exc:
LOG.error("Declare failed and raise exception: 0".format(exc))
raise exc
def start_consuming(self):
while not self._exit:
wait_method = amqp.spec.Channel.CloseOk
try:
self.chan.wait(wait_method, timeout=1)
except socket.timeout:
pass
except Exception as exc:
LOG.error("Consume stop and raise exception: 0".format(exc))
raise exc
def signal_handler(self, signum, frame):
LOG.info("Received signal: and exit".format(signum))
self._exit = True
def setup_signal(self):
self._exit = False
signal.signal(signal.SIGHUP, self.signal_handler)
signal.signal(signal.SIGINT, self.signal_handler)
signal.signal(signal.SIGQUIT, self.signal_handler)
signal.signal(signal.SIGALRM, self.signal_handler)
signal.signal(signal.SIGTERM, self.signal_handler)
signal.signal(signal.SIGCONT, self.signal_handler)
def parse_record():
record = utils.read_record()
entities = record.get('entities')
return entities
def main(argv):
url = raw_input("Please input a URL (<ip>:5672): ")
name, password = "guest", "guest"
entities = parse_record()
with Consumer(url, name, password) as c:
c.declare(exchange=entities['exchange'],
exchange_type=entities['exchange_type'],
queue=entities['queue'],
binding_key=entities['binding_key'],
routing_key=entities['routing_key'])
c.setup_signal()
c.start_consuming()
if __name__ == "__main__":
main(sys.argv)
4 运行
[root@master code]# python producer.py
Please input a URL (<ip>:5672): 192.168.1.100
2019-10-06 10:44:20,915 140120100865856 INFO __main__ Connect to RabbitMQ server.
/usr/lib/python2.7/site-packages/amqp-2.5.2-py2.7.egg/amqp/connection.py:325: AMQPDeprecationWarning: The .frame_writer attribute on the connection was accessed before
the connection was established. This is supported for now, but will
be deprecated in amqp 2.2.0.
Since amqp 2.0 you have to explicitly call Connection.connect()
before using the connection.
W_FORCE_CONNECT.format(attr=attr)))
2019-10-06 10:44:20,933 140120100865856 INFO __main__ Start to send 1 msg.
2019-10-06 10:44:20,935 140120100865856 INFO __main__ Finish to send 1 msg.
2019-10-06 10:44:22,936 140120100865856 INFO __main__ Start to send 2 msg.
2019-10-06 10:44:22,938 140120100865856 INFO __main__ Finish to send 2 msg.
2019-10-06 10:44:24,949 140120100865856 INFO __main__ Close connection with RabbitMQ server.
[root@master code]# python consumer.py
Please input a URL (<ip>:5672): 192.168.1.100
2019-10-06 10:44:08,268 140428916774720 INFO __main__ Connect to RabbitMQ server.
/usr/lib/python2.7/site-packages/amqp-2.5.2-py2.7.egg/amqp/connection.py:325: AMQPDeprecationWarning: The .frame_writer attribute on the connection was accessed before
the connection was established. This is supported for now, but will
be deprecated in amqp 2.2.0.
Since amqp 2.0 you have to explicitly call Connection.connect()
before using the connection.
W_FORCE_CONNECT.format(attr=attr)))
2019-10-06 10:44:20,936 140428916774720 INFO __main__ Received msg: "body": "RabbitMQ transport message", "version": "1.0" properties: u'delivery_mode': 2 delivery: u'consumer_tag': u'tag_base_queue', u'redelivered': False, u'routing_key': u'base.test', u'delivery_tag': 1, u'exchange': u'base_exchange'
2019-10-06 10:44:22,938 140428916774720 INFO __main__ Received msg: "body": "RabbitMQ transport message", "version": "1.0" properties: u'delivery_mode': 2 delivery: u'consumer_tag': u'tag_base_queue', u'redelivered': False, u'routing_key': u'base.test', u'delivery_tag': 2, u'exchange': u'base_exchange'
以上是关于基于py-amqp编写生产者和消费者代码的主要内容,如果未能解决你的问题,请参考以下文章