消息队列_RabbitMQ-0004.深入RabbitMQ之分类告警/并行执行/RPC响应?
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了消息队列_RabbitMQ-0004.深入RabbitMQ之分类告警/并行执行/RPC响应?相关的知识,希望对你有一定的参考价值。
应用场景:
1. 通知,针对发送事件的描述,内容可以是消息的日志,也可以是真实的报告通知给另一个程序或者管理员.
说明: 首先选择交换机,如果选择fanout交换机,则需要为每种告警传输类型(邮件/微信/手机/短信)创建队列,但同时也带来坏处就是每个消息都会发送到所有队列,导致告警消息发生时,被报警消息淹没,如果选择topic交换机,则可为其创建四种严重级别告警info/warning/problem/citical,但如果使用fanout类型交换机消息会发送到所有这四个级别队列,如果使用direct交换机,则四个严重等级会被定死,无法扩展,而topic交换机则允许我们在如上四个严重等级上加上类型,如当我们触发报警API时候路由键设置为critical.rate_limit,则消息不经会发送到cirtica.*l队列而且同时会被发送到*.rate_limite,至于针对每种类型怎么处理这就是消费者该干的事情了~
> 消费者
#!/usr/bin/env python # -*- coding: utf-8 -*- """ # # Authors: limanman # OsChina: http://xmdevops.blog.51cto.com/ # Purpose: # """ # 说明: 导入公共模块 import pika import json # 说明: 导入其它模块 if __name__ == ‘__main__‘: # 创建凭证对象 credentials = pika.PlainCredentials(‘root‘, ‘qwertyuiop‘) # 创建参数对象 conn_params = pika.ConnectionParameters( # RabbitMQ服务地址 host=‘127.0.0.1‘, # RabbitMQ服务端口 port=5672, # RabbitMQ登录凭证 credentials=credentials, # RabbitMQ虚拟主机 virtual_host=‘/‘ ) # 创建连接对象 conn_broker = pika.BlockingConnection(conn_params) # 获取信道对象 channel = conn_broker.channel() # 创建日志交换机 channel.exchange_declare( # 交换机名称 exchange="xmzoomeye_alerts", # 交换机类型 exchange_type="topic", # 如果同名交换机已存在依然返回成功,否则创建 passive=False, # 声明为持久化交换机 durable=True, # 交换机闲置也不会自动删除 auto_delete=False ) # 创建info日志级别队列 channel.queue_declare( # 队列名称 queue="info", # 如果同名队列已存在依然返回成功,否则创建 passive=False, # 声明为持久化队列 durable=True, # 声明为非私有队列 exclusive=False, # 队列闲置也不会自动删除 auto_delete=False ) # 创建warning日志级别队列 channel.queue_declare( # 队列名称 queue="warning", # 如果同名队列已存在依然返回成功,否则创建 passive=False, # 声明为持久化队列 durable=True, # 声明为非私有队列 exclusive=False, # 队列闲置也不会自动删除 auto_delete=False ) # 创建problem日志级别队列 channel.queue_declare( # 队列名称 queue="problem", # 如果同名队列已存在依然返回成功,否则创建 passive=False, # 声明为持久化队列 durable=True, # 声明为非私有队列 exclusive=False, # 队列闲置也不会自动删除 auto_delete=False ) # 创建cirtical日志级别队列 channel.queue_declare( # 队列名称 queue="cirtical", # 如果同名队列已存在依然返回成功,否则创建 passive=False, # 声明为持久化队列 durable=True, # 声明为非私有队列 exclusive=False, # 队列闲置也不会自动删除 auto_delete=False ) # 创建rate_limit日志级别队列 channel.queue_declare( # 队列名称 queue="cirtical_ratelimit", # 如果同名队列已存在依然返回成功,否则创建 passive=False, # 声明为持久化队列 durable=True, # 声明为非私有队列 exclusive=False, # 队列闲置也不会自动删除 auto_delete=False ) # 绑定队列 channel.queue_bind( # 队列名称 queue="info", # 交换机名称 exchange="xmzoomeye_alerts", # 路由键名称 routing_key="info.*" ) channel.queue_bind( # 队列名称 queue="warning", # 交换机名称 exchange="xmzoomeye_alerts", # 路由键名称 routing_key="warning.*" ) channel.queue_bind( # 队列名称 queue="problem", # 交换机名称 exchange="xmzoomeye_alerts", # 路由键名称 routing_key="problem.*" ) channel.queue_bind( # 队列名称 queue="cirtical", # 交换机名称 exchange="xmzoomeye_alerts", # 路由键名称 routing_key="cirtical.*" ) channel.queue_bind( # 队列名称 queue="cirtical_ratelimit", # 交换机名称 exchange="xmzoomeye_alerts", # 路由键名称 routing_key="*.ratelimit" ) def callback_wrapper(callback): def wrapper(channel, method, header, body): print ‘#{0}[{1}]>: {2}‘.format(method.consumer_tag, method.delivery_tag, body), if header.content_type != ‘application/json‘: print ‘with wrong content_type(application/json)‘ channel.basic_ack(delivery_tag=method.delivery_tag) return print ‘with correct content_type(application/json)‘ callback(channel, method, header, body) # 发送消息确认 channel.basic_ack(delivery_tag=method.delivery_tag) return wrapper @callback_wrapper def info_callback_handler(channel, method, header, body): """ channel: 和RabbitMQ通信的信道对象,如果多个信道则会关联接收到消息的订阅的信道 method : 一个方法帧对象,携带关联订阅的消费者标记以及投递者标记 header : AMQP消息头信息,携带可选的消息元数据,如数据类型 body : 实际消息内容 """ pass return @callback_wrapper def warning_callback_handler(channel, method, header, body): pass return @callback_wrapper def problem_callback_handler(channel, method, header, body): pass return @callback_wrapper def cirtical_callback_handler(channel, method, header, body): pass return @callback_wrapper def cirtical_ratelimit_callback_handler(channel, method, header, body): pass return # 作为指定队列消费者 channel.basic_consume( info_callback_handler, queue="info", # 必须确认后再接收后续消息 no_ack=False, consumer_tag="xmzoomeye_alerts_info" ) channel.basic_consume( warning_callback_handler, queue="warning", no_ack=False, consumer_tag="xmzoomeye_alerts_warning" ) channel.basic_consume( problem_callback_handler, queue="problem", no_ack=False, consumer_tag="xmzoomeye_alerts_problem" channel.basic_consume( cirtical_callback_handler, queue="cirtical", no_ack=False, consumer_tag="xmzoomeye_alerts_cirtical" ) channel.basic_consume( cirtical_ratelimit_callback_handler, queue="cirtical_ratelimit", no_ack=False, consumer_tag="xmzoomeye_alerts_cirtical_ratelimit" ) # 循环调用回调函数接收处理消息 channel.start_consuming() channel.close()
> 生产者
#!/usr/bin/env python # -*- coding: utf-8 -*- """ # # Authors: limanman # OsChina: http://xmdevops.blog.51cto.com/ # Purpose: # """ # 说明: 导入公共模块 import sys import pika # 说明: 导入其它模块 if __name__ == ‘__main__‘: # 创建凭证对象 credentials = pika.PlainCredentials(‘root‘, ‘qwertyuiop‘) # 创建参数对象 conn_params = pika.ConnectionParameters( # RabbitMQ服务地址 host=‘127.0.0.1‘, # RabbitMQ服务端口 port=5672, # RabbitMQ登录凭证 credentials=credentials, # RabbitMQ虚拟主机 virtual_host=‘/‘ ) # 创建连接对象 conn_broker = pika.BlockingConnection(conn_params) # 获取信道对象 channel = conn_broker.channel() exchange = sys.argv[1] routekey = sys.argv[2] messages = sys.argv[3] # 创建配置对象 msg_props = pika.BasicProperties() # 设置内容类型 msg_props.content_type = ‘application/json‘ # 尝试发布消息 channel.basic_publish( # 发布消息内容 body=messages, # 发布到交换机 exchange=exchange, # 发布信息属性 properties=msg_props, # 发布信息时携带的路由键 routing_key=routekey )
说明: 测试非常简单,首先启动消费者python consumer.py,然后尝试执行生产者python producer.py xmzoomeye_alerts cirtical.ratelimit ‘{"error": "cirtical rate limit"}‘,查看消费者端输出
扩展: 对于发后即忘的消息通信模式可轻而易举的扩展,如添加一个额外的队列绑定路由键*.*,将所有级别的告警记录都记录到数据库中,以便后期分析/汇总/压缩/分类/查询等操作~
2. 批量,针对大型数据集合的工作或者转换,这种类型的任务可以构建为单一的任务请求,或者多个任务对数据集合的独立部分进行操作.
> 消费者
#!/usr/bin/env python # -*- coding: utf-8 -*- """ # # Authors: limanman # OsChina: http://xmdevops.blog.51cto.com/ # Purpose: # """ # 说明: 导入公共模块 import pika # 说明: 导入其它模块 if __name__ == ‘__main__‘: # 创建凭证对象 credentials = pika.PlainCredentials(‘guest‘, ‘guest‘) # 创建参数对象 conn_params = pika.ConnectionParameters( # RabbitMQ服务地址 host=‘127.0.0.1‘, # RabbitMQ服务端口 port=5672, # RabbitMQ服务凭证 credentials=credentials, # RabbitMQ虚拟主机 virtual_host=‘/‘ ) # 创建连接对象 conn_broker = pika.BlockingConnection(conn_params) # 获取信道对象 channel = conn_broker.channel() # 创建交换机 channel.exchange_declare( # 交换机名称 exchange="salt-exchange", # 交换机类型 type="direct", # 如果同名交换机已存在依然返回成功 passive=False, # 声明为持久化交换机 durable=False, # 交换机闲置也不会自动删除 auto_delete=False ) # 创建队列 channel.queue_declare(queue="salt") # 绑定队列 channel.queue_bind( # 队列名称 queue="salt", # 交换机名称 exchange="salt-exchange", # 路由键名称 routing_key="salt" ) # 消息回调处理函数 def msg_consumer(channel, method, header, body): # 发送消息确认 channel.basic_ack(delivery_tag=method.delivery_tag) # 退出监听循环 if body == ‘exit‘: channel.basic_cancel(consumer_tag="salt-consumer") channel.stop_consuming() else: print ‘found notice: recive queue message {0}‘.format(body) return # 作为指定队列消费者 channel.basic_consume(msg_consumer, queue="salt", consumer_tag="salt-consumer") # 循环调用回调函数接收处理消息 channel.start_consuming()
> 生产者
#!/usr/bin/env python # -*- coding: utf-8 -*- """ # # Authors: limanman # OsChina: http://xmdevops.blog.51cto.com/ # Purpose: # """ # 说明: 导入公共模块 import sys import pika # 说明: 导入其它模块 if __name__ == ‘__main__‘: # 创建凭证对象 credentials = pika.PlainCredentials(‘root‘, ‘qwertyuiop‘) # 创建参数对象 conn_params = pika.ConnectionParameters( # RabbitMQ服务地址 host=‘127.0.0.1‘, # RabbitMQ服务端口 port=5672, # RabbitMQ登录凭证 credentials=credentials, # RabbitMQ虚拟主机 virtual_host=‘/‘ ) # 创建连接对象 conn_broker = pika.BlockingConnection(conn_params) # 获取信道对象 channel = conn_broker.channel() exchange = sys.argv[1] messages = sys.argv[2] # 创建配置对象 msg_props = pika.BasicProperties() # 设置内容类型 msg_props.content_type = ‘application/json‘ # 消息持久化 msg_props.delivery_mode = 2 # 尝试发布消息 channel.basic_publish( # 发布消息内容 body=messages, # 发布到交换机 exchange=exchange, # 发布信息属性 properties=msg_props, # 扇形交换机本身不需要路由键,但参数个数限制,随意推荐大家直接写#匹配所有 routing_key="#" )
说明: 测试非常简单,首先启动消费者python consumer.py,然后尝试执行生产者python producer.py upload_pictures ‘{"image_id": 1, "user_id": 1, "image_path": "/xm-workspace/xm-webs/www.pic.com/data/images/73197d57-46a9-4d19-a48f-a44e0ad5e493.jpg"}‘,查看消费者端输出
扩展: 如上在WEB页面上上传完图片后,希望对图片进行生成缩略图/奖励上传用户积分/分享通知朋友圈等等,这几个任务之间是没有相互依赖关系的,不需要等待对方的结果才能继续执行,所以可以并行执行,扩展起来也非常容易,直接添加一个对应的队列和消费者即可,如要记录上传图片日志记录需求~,当发现一个创建缩略图的消费者跟不上节奏,直接在同台或异台服务器上再跑一个创建缩略图的消费者即可,任务会自动轮询分配,这一切对于用户是无感知的~
3. RPC,针对大量RPC请求使用消息来发回应答,AMQP消息头里有一个reply_to字段,生产者JSON RPC-API生成随机零时队列名存储到预发布RPC调用消息的头部reply_to字段到指定队列,然后在随机队列上监听响应数据,消费者JSON RPC-SRV接收到消息处理完毕后读取回调中header的reply_to字段,然后将响应发回零时队列,由于所有没有绑定交换机的队列都会自动绑定到匿名交换机,所以必用申请额外的交换机直接使用匿名交换机,消息一旦被接收,零时队列会自动被删除.至此完成一次RPC调用
> 消费者
#!/usr/bin/env python # -*- coding: utf-8 -*- """ # # Authors: limanman # OsChina: http://xmdevops.blog.51cto.com/ # Purpose: # """ # 说明: 导入公共模块 import pika import json # 说明: 导入其它模块 if __name__ == ‘__main__‘: # 创建凭证对象 credentials = pika.PlainCredentials(‘root‘, ‘qwertyuiop‘) # 创建参数对象 conn_params = pika.ConnectionParameters( # RabbitMQ服务地址 host=‘127.0.0.1‘, # RabbitMQ服务端口 port=5672, # RabbitMQ登录凭证 credentials=credentials, # RabbitMQ虚拟主机 virtual_host=‘/‘ ) # 创建连接对象 conn_broker = pika.BlockingConnection(conn_params) # 获取信道对象 channel = conn_broker.channel() # 创建RPC交换机 channel.exchange_declare( # 交换机名称 exchange="rpc", # 交换机类型 exchange_type="direct", # 如果同名交换机已存在依然返回成功,否则创建 passive=False, # 声明为持久化交换机 durable=True, # 交换机闲置也不会自动删除 auto_delete=False ) # 创建ping任务队列 channel.queue_declare( # 队列名称 queue="ping", # 如果同名队列已存在依然返回成功,否则创建 passive=False, # 声明为非持久化队列 durable=False, # 声明为私有队列 exclusive=True, # 队列闲置会自动删除 auto_delete=True ) # 绑定队列 channel.queue_bind( # 队列名称 queue="ping", # 交换机名称 exchange="rpc", ) # 请求回调 def api_request_ping(channel, method, header, body): """ channel: 和RabbitMQ通信的信道对象,如果多个信道则会关联接收到消息的订阅的信道 method : 一个方法帧对象,携带关联订阅的消费者标记以及投递者标记 header : AMQP消息头信息,携带可选的消息元数据,如数据类型 body : 实际消息内容 """ print ‘#{0}[{1}]>: {2}‘.format( # 由于此处的header是生产者的,所以可通过header.reply_to获取随机队列名 header.reply_to, method.delivery_tag, body, ) # 发送消息确认 channel.basic_ack(delivery_tag=method.delivery_tag) # 发送响应对象 channel.basic_publish( body="pong", # 由于所有队列默认都会绑定到匿名交换机,非常方便直接发给它,它会根据传递过来的路由键原路返回 exchange="", routing_key=header.reply_to ) return # 作为指定队列消费者 channel.basic_consume( api_request_ping, queue="ping", # 必须确认后再接收后续消息 no_ack=False, consumer_tag="ping_request" ) # 循环调用回调函数接收处理消息 channel.start_consuming() channel.close()
> 生产者
#!/usr/bin/env python # -*- coding: utf-8 -*- """ # # Authors: limanman # OsChina: http://xmdevops.blog.51cto.com/ # Purpose: # """ # 说明: 导入公共模块 import pika # 说明: 导入其它模块 if __name__ == ‘__main__‘: # 创建凭证对象 credentials = pika.PlainCredentials(‘root‘, ‘qwertyuiop‘) # 创建参数对象 conn_params = pika.ConnectionParameters( # RabbitMQ服务地址 host=‘127.0.0.1‘, # RabbitMQ服务端口 port=5672, # RabbitMQ登录凭证 credentials=credentials, # RabbitMQ虚拟主机 virtual_host=‘/‘ ) # 创建连接对象 conn_broker = pika.BlockingConnection(conn_params) # 获取信道对象 channel = conn_broker.channel() # 创建匿名零时响应队列,返回唯一队列名称通过header的reply_to给消费者 queue = channel.queue_declare( # 如果同名队列已存在依然返回成功,否则创建 passive=False, # 声明为非持久化队列 durable=False, # 声明为私有队列 exclusive=True, # 队列闲置会自动删除 auto_delete=True ) exchange = sys.argv[1] messages = sys.argv[2] # 创建配置对象 msg_props = pika.BasicProperties() # 设置内容类型 msg_props.content_type = ‘application/json‘ # 将唯一响应队列名传递给消费者 msg_props.reply_to = queue.method.queue # 发送消息 channel.basic_publish( # 发布消息内容 body=messages, # 发布到交换机 exchange=exchange, # 发布信息属性 properties=msg_props, # 消息路由键 routing_key="ping" ) # 响应回调 def api_response_ping(channel, method, header, body): """ channel: 和RabbitMQ通信的信道对象,如果多个信道则会关联接收到消息的订阅的信道 method : 一个方法帧对象,携带关联订阅的消费者标记以及投递者标记 header : AMQP消息头信息,携带可选的消息元数据,如数据类型 body : 实际消息内容 """ print ‘#{0}[{1}]>: {2}‘.format( # 由于此处返回的是消费者的header,所以不能使用header.reply_to而应该使用生成的随机唯一队列名 queue.method.queue, method.delivery_tag, body, ) # 发送消息确认 channel.basic_ack(delivery_tag=method.delivery_tag) return # 作为指定队列消费者 channel.basic_consume( api_response_ping, # 从匿名零时队列中收取消息 queue=queue.method.queue, # 必须确认后再接收后续消息 no_ack=False, consumer_tag="ping_response" ) # 循环调用回调函数接收处理消息 channel.start_consuming() channel.close()
说明: 测试非常简单,首先启动消费者python api_server.py,然后尝试执行生产者python rpc_client.py rpc ‘{"exec": "ping"}‘,查看消费者端输出
扩展: 可以轻易的通过创建队列和绑定的方式来扩展API以支持新的API方法,这样做的最大好处是任何一台服务器都无需对所有的API调用做应答,其它RPC服务器部署在同台或异台物理机器,而这一切对于用户是无感知的~
本文出自 “满满李 - 运维开发之路” 博客,请务必保留此出处http://xmdevops.blog.51cto.com/11144840/1878573
以上是关于消息队列_RabbitMQ-0004.深入RabbitMQ之分类告警/并行执行/RPC响应?的主要内容,如果未能解决你的问题,请参考以下文章
消息队列_RabbitMQ-0002.深入MQ生产者/信道/交换机/队列/消费者?