消息队列_RabbitMQ-0004.深入RabbitMQ之分类告警/并行执行/RPC响应?

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了消息队列_RabbitMQ-0004.深入RabbitMQ之分类告警/并行执行/RPC响应?相关的知识,希望对你有一定的参考价值。

应用场景:

1. 通知,针对发送事件的描述,内容可以是消息的日志,也可以是真实的报告通知给另一个程序或者管理员.

技术分享

说明: 首先选择交换机,如果选择fanout交换机,则需要为每种告警传输类型(邮件/微信/手机/短信)创建队列,但同时也带来坏处就是每个消息都会发送到所有队列,导致告警消息发生时,被报警消息淹没,如果选择topic交换机,则可为其创建四种严重级别告警info/warning/problem/citical,但如果使用fanout类型交换机消息会发送到所有这四个级别队列,如果使用direct交换机,则四个严重等级会被定死,无法扩展,而topic交换机则允许我们在如上四个严重等级上加上类型,如当我们触发报警API时候路由键设置为critical.rate_limit,则消息不经会发送到cirtica.*l队列而且同时会被发送到*.rate_limite,至于针对每种类型怎么处理这就是消费者该干的事情了~


> 消费者


#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
#
# Authors: limanman
# OsChina: http://xmdevops.blog.51cto.com/
# Purpose:
#
"""
# 说明: 导入公共模块
import pika
import json
# 说明: 导入其它模块
if __name__ == ‘__main__‘:
    # 创建凭证对象
    credentials = pika.PlainCredentials(‘root‘, ‘qwertyuiop‘)
    # 创建参数对象
    conn_params = pika.ConnectionParameters(
        # RabbitMQ服务地址
        host=‘127.0.0.1‘,
        # RabbitMQ服务端口
        port=5672,
        # RabbitMQ登录凭证
        credentials=credentials,
        # RabbitMQ虚拟主机
        virtual_host=‘/‘
    )
    # 创建连接对象
    conn_broker = pika.BlockingConnection(conn_params)
    # 获取信道对象
    channel = conn_broker.channel()
    # 创建日志交换机
    channel.exchange_declare(
        # 交换机名称
        exchange="xmzoomeye_alerts",
        # 交换机类型
        exchange_type="topic",
        # 如果同名交换机已存在依然返回成功,否则创建
        passive=False,
        # 声明为持久化交换机
        durable=True,
        # 交换机闲置也不会自动删除
        auto_delete=False
    )
    # 创建info日志级别队列
    channel.queue_declare(
            # 队列名称
            queue="info",
            # 如果同名队列已存在依然返回成功,否则创建
            passive=False,
            # 声明为持久化队列
            durable=True,
            # 声明为非私有队列
            exclusive=False,
            # 队列闲置也不会自动删除
            auto_delete=False
    )
    # 创建warning日志级别队列
    channel.queue_declare(
            # 队列名称
            queue="warning",
            # 如果同名队列已存在依然返回成功,否则创建
            passive=False,
            # 声明为持久化队列
            durable=True,
            # 声明为非私有队列
            exclusive=False,
            # 队列闲置也不会自动删除
            auto_delete=False
    )
    # 创建problem日志级别队列
    channel.queue_declare(
            # 队列名称
            queue="problem",
            # 如果同名队列已存在依然返回成功,否则创建
            passive=False,
            # 声明为持久化队列
            durable=True,
            # 声明为非私有队列
            exclusive=False,
            # 队列闲置也不会自动删除
            auto_delete=False
    )
    # 创建cirtical日志级别队列
    channel.queue_declare(
            # 队列名称
            queue="cirtical",
            # 如果同名队列已存在依然返回成功,否则创建
            passive=False,
            # 声明为持久化队列
            durable=True,
            # 声明为非私有队列
            exclusive=False,
            # 队列闲置也不会自动删除
            auto_delete=False
    )
    # 创建rate_limit日志级别队列
    channel.queue_declare(
            # 队列名称
            queue="cirtical_ratelimit",
            # 如果同名队列已存在依然返回成功,否则创建
            passive=False,
            # 声明为持久化队列
            durable=True,
            # 声明为非私有队列
            exclusive=False,
            # 队列闲置也不会自动删除
            auto_delete=False
    )
    # 绑定队列
    channel.queue_bind(
        # 队列名称
        queue="info",
        # 交换机名称
        exchange="xmzoomeye_alerts",
        # 路由键名称
        routing_key="info.*"
    )
    channel.queue_bind(
        # 队列名称
        queue="warning",
        # 交换机名称
        exchange="xmzoomeye_alerts",
        # 路由键名称
        routing_key="warning.*"
    )
    channel.queue_bind(
        # 队列名称
        queue="problem",
        # 交换机名称
        exchange="xmzoomeye_alerts",
        # 路由键名称
        routing_key="problem.*"
    )
    channel.queue_bind(
        # 队列名称
        queue="cirtical",
        # 交换机名称
        exchange="xmzoomeye_alerts",
        # 路由键名称
        routing_key="cirtical.*"
    )
    channel.queue_bind(
        # 队列名称
        queue="cirtical_ratelimit",
        # 交换机名称
        exchange="xmzoomeye_alerts",
        # 路由键名称
        routing_key="*.ratelimit"
    )
    def callback_wrapper(callback):
        def wrapper(channel, method, header, body):
            print ‘#{0}[{1}]>: {2}‘.format(method.consumer_tag, method.delivery_tag, body),
            if header.content_type != ‘application/json‘:
                print ‘with wrong content_type(application/json)‘
                channel.basic_ack(delivery_tag=method.delivery_tag)
                return
            print ‘with correct content_type(application/json)‘
            callback(channel, method, header, body)
            # 发送消息确认
            channel.basic_ack(delivery_tag=method.delivery_tag)
        return wrapper
    @callback_wrapper
    def info_callback_handler(channel, method, header, body):
        """
        channel: 和RabbitMQ通信的信道对象,如果多个信道则会关联接收到消息的订阅的信道
        method : 一个方法帧对象,携带关联订阅的消费者标记以及投递者标记
        header : AMQP消息头信息,携带可选的消息元数据,如数据类型
        body   : 实际消息内容
        """
        pass
        return
    @callback_wrapper
    def warning_callback_handler(channel, method, header, body):
        pass
        return
    @callback_wrapper
    def problem_callback_handler(channel, method, header, body):
        pass
        return
    @callback_wrapper
    def cirtical_callback_handler(channel, method, header, body):
        pass
        return
    @callback_wrapper
    def cirtical_ratelimit_callback_handler(channel, method, header, body):
        pass
        return
    # 作为指定队列消费者
    channel.basic_consume(
            info_callback_handler,
            queue="info",
            # 必须确认后再接收后续消息
            no_ack=False,
            consumer_tag="xmzoomeye_alerts_info"
    )
    channel.basic_consume(
            warning_callback_handler,
            queue="warning",
            no_ack=False,
            consumer_tag="xmzoomeye_alerts_warning"
    )
    channel.basic_consume(
            problem_callback_handler,
            queue="problem",
            no_ack=False,
            consumer_tag="xmzoomeye_alerts_problem"
    channel.basic_consume(
            cirtical_callback_handler,
            queue="cirtical",
            no_ack=False,
            consumer_tag="xmzoomeye_alerts_cirtical"
    )
    channel.basic_consume(
            cirtical_ratelimit_callback_handler,
            queue="cirtical_ratelimit",
            no_ack=False,
            consumer_tag="xmzoomeye_alerts_cirtical_ratelimit"
    )
    # 循环调用回调函数接收处理消息
    channel.start_consuming()
    channel.close()

> 生产者


#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
#
# Authors: limanman
# OsChina: http://xmdevops.blog.51cto.com/
# Purpose:
#
"""
# 说明: 导入公共模块
import sys
import pika
# 说明: 导入其它模块
if __name__ == ‘__main__‘:
    # 创建凭证对象
    credentials = pika.PlainCredentials(‘root‘, ‘qwertyuiop‘)
    # 创建参数对象
    conn_params = pika.ConnectionParameters(
        # RabbitMQ服务地址
        host=‘127.0.0.1‘,
        # RabbitMQ服务端口
        port=5672,
        # RabbitMQ登录凭证
        credentials=credentials,
        # RabbitMQ虚拟主机
        virtual_host=‘/‘
    )
    # 创建连接对象
    conn_broker = pika.BlockingConnection(conn_params)
    # 获取信道对象
    channel = conn_broker.channel()
    exchange = sys.argv[1]
    routekey = sys.argv[2]
    messages = sys.argv[3]
    # 创建配置对象
    msg_props = pika.BasicProperties()
    # 设置内容类型
    msg_props.content_type = ‘application/json‘
    # 尝试发布消息
    channel.basic_publish(
        # 发布消息内容
        body=messages,
        # 发布到交换机
        exchange=exchange,
        # 发布信息属性
        properties=msg_props,
        # 发布信息时携带的路由键
        routing_key=routekey
    )

说明: 测试非常简单,首先启动消费者python consumer.py,然后尝试执行生产者python producer.py xmzoomeye_alerts cirtical.ratelimit ‘{"error": "cirtical rate limit"}‘,查看消费者端输出

扩展: 对于发后即忘的消息通信模式可轻而易举的扩展,如添加一个额外的队列绑定路由键*.*,将所有级别的告警记录都记录到数据库中,以便后期分析/汇总/压缩/分类/查询等操作~


2. 批量,针对大型数据集合的工作或者转换,这种类型的任务可以构建为单一的任务请求,或者多个任务对数据集合的独立部分进行操作.


技术分享



> 消费者


#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
#
# Authors: limanman
# OsChina: http://xmdevops.blog.51cto.com/
# Purpose:
#
"""
# 说明: 导入公共模块
import pika
# 说明: 导入其它模块
if __name__ == ‘__main__‘:
    # 创建凭证对象
    credentials = pika.PlainCredentials(‘guest‘, ‘guest‘)
    # 创建参数对象
    conn_params = pika.ConnectionParameters(
        # RabbitMQ服务地址
        host=‘127.0.0.1‘,
        # RabbitMQ服务端口
        port=5672,
        # RabbitMQ服务凭证
        credentials=credentials,
        # RabbitMQ虚拟主机
        virtual_host=‘/‘
    )
    # 创建连接对象
    conn_broker = pika.BlockingConnection(conn_params)
    # 获取信道对象
    channel = conn_broker.channel()
    # 创建交换机
    channel.exchange_declare(
        # 交换机名称
        exchange="salt-exchange",
        # 交换机类型
        type="direct",
        # 如果同名交换机已存在依然返回成功
        passive=False,
        # 声明为持久化交换机
        durable=False,
        # 交换机闲置也不会自动删除
        auto_delete=False
    )
    # 创建队列
    channel.queue_declare(queue="salt")
    # 绑定队列
    channel.queue_bind(
        # 队列名称
        queue="salt",
        # 交换机名称
        exchange="salt-exchange",
        # 路由键名称
        routing_key="salt"
    )
    # 消息回调处理函数
    def msg_consumer(channel, method, header, body):
        # 发送消息确认
        channel.basic_ack(delivery_tag=method.delivery_tag)
        # 退出监听循环
        if body == ‘exit‘:
            channel.basic_cancel(consumer_tag="salt-consumer")
            channel.stop_consuming()
        else:
            print ‘found notice: recive queue message {0}‘.format(body)
        return
    # 作为指定队列消费者
    channel.basic_consume(msg_consumer, queue="salt", consumer_tag="salt-consumer")
    # 循环调用回调函数接收处理消息
    channel.start_consuming()

> 生产者


#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
#
# Authors: limanman
# OsChina: http://xmdevops.blog.51cto.com/
# Purpose:
#
"""
# 说明: 导入公共模块
import sys
import pika
# 说明: 导入其它模块
if __name__ == ‘__main__‘:
    # 创建凭证对象
    credentials = pika.PlainCredentials(‘root‘, ‘qwertyuiop‘)
    # 创建参数对象
    conn_params = pika.ConnectionParameters(
        # RabbitMQ服务地址
        host=‘127.0.0.1‘,
        # RabbitMQ服务端口
        port=5672,
        # RabbitMQ登录凭证
        credentials=credentials,
        # RabbitMQ虚拟主机
        virtual_host=‘/‘
    )
    # 创建连接对象
    conn_broker = pika.BlockingConnection(conn_params)
    # 获取信道对象
    channel = conn_broker.channel()
    exchange = sys.argv[1]
    messages = sys.argv[2]
    # 创建配置对象
    msg_props = pika.BasicProperties()
    # 设置内容类型
    msg_props.content_type = ‘application/json‘
    # 消息持久化
    msg_props.delivery_mode = 2
    # 尝试发布消息
    channel.basic_publish(
        # 发布消息内容
        body=messages,
        # 发布到交换机
        exchange=exchange,
        # 发布信息属性
        properties=msg_props,
        # 扇形交换机本身不需要路由键,但参数个数限制,随意推荐大家直接写#匹配所有
        routing_key="#"
    )

说明: 测试非常简单,首先启动消费者python consumer.py,然后尝试执行生产者python producer.py upload_pictures ‘{"image_id": 1, "user_id": 1, "image_path": "/xm-workspace/xm-webs/www.pic.com/data/images/73197d57-46a9-4d19-a48f-a44e0ad5e493.jpg"}‘,查看消费者端输出

扩展: 如上在WEB页面上上传完图片后,希望对图片进行生成缩略图/奖励上传用户积分/分享通知朋友圈等等,这几个任务之间是没有相互依赖关系的,不需要等待对方的结果才能继续执行,所以可以并行执行,扩展起来也非常容易,直接添加一个对应的队列和消费者即可,如要记录上传图片日志记录需求~,当发现一个创建缩略图的消费者跟不上节奏,直接在同台或异台服务器上再跑一个创建缩略图的消费者即可,任务会自动轮询分配,这一切对于用户是无感知的~


3. RPC,针对大量RPC请求使用消息来发回应答,AMQP消息头里有一个reply_to字段,生产者JSON RPC-API生成随机零时队列名存储到预发布RPC调用消息的头部reply_to字段到指定队列,然后在随机队列上监听响应数据,消费者JSON RPC-SRV接收到消息处理完毕后读取回调中header的reply_to字段,然后将响应发回零时队列,由于所有没有绑定交换机的队列都会自动绑定到匿名交换机,所以必用申请额外的交换机直接使用匿名交换机,消息一旦被接收,零时队列会自动被删除.至此完成一次RPC调用



技术分享

> 消费者

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
#
# Authors: limanman
# OsChina: http://xmdevops.blog.51cto.com/
# Purpose:
#
"""
# 说明: 导入公共模块
import pika
import json
# 说明: 导入其它模块
if __name__ == ‘__main__‘:
    # 创建凭证对象
    credentials = pika.PlainCredentials(‘root‘, ‘qwertyuiop‘)
    # 创建参数对象
    conn_params = pika.ConnectionParameters(
        # RabbitMQ服务地址
        host=‘127.0.0.1‘,
        # RabbitMQ服务端口
        port=5672,
        # RabbitMQ登录凭证
        credentials=credentials,
        # RabbitMQ虚拟主机
        virtual_host=‘/‘
    )
    # 创建连接对象
    conn_broker = pika.BlockingConnection(conn_params)
    # 获取信道对象
    channel = conn_broker.channel()
    # 创建RPC交换机
    channel.exchange_declare(
        # 交换机名称
        exchange="rpc",
        # 交换机类型
        exchange_type="direct",
        # 如果同名交换机已存在依然返回成功,否则创建
        passive=False,
        # 声明为持久化交换机
        durable=True,
        # 交换机闲置也不会自动删除
        auto_delete=False
    )
    # 创建ping任务队列
    channel.queue_declare(
            # 队列名称
            queue="ping",
            # 如果同名队列已存在依然返回成功,否则创建
            passive=False,
            # 声明为非持久化队列
            durable=False,
            # 声明为私有队列
            exclusive=True,
            # 队列闲置会自动删除
            auto_delete=True
    )
    # 绑定队列
    channel.queue_bind(
        # 队列名称
        queue="ping",
        # 交换机名称
        exchange="rpc",
    )
    # 请求回调
    def api_request_ping(channel, method, header, body):
        """
        channel: 和RabbitMQ通信的信道对象,如果多个信道则会关联接收到消息的订阅的信道
        method : 一个方法帧对象,携带关联订阅的消费者标记以及投递者标记
        header : AMQP消息头信息,携带可选的消息元数据,如数据类型
        body   : 实际消息内容
        """
        print ‘#{0}[{1}]>: {2}‘.format(
                # 由于此处的header是生产者的,所以可通过header.reply_to获取随机队列名
                header.reply_to,
                method.delivery_tag,
                body,
        )
        # 发送消息确认
        channel.basic_ack(delivery_tag=method.delivery_tag)
        # 发送响应对象
        channel.basic_publish(
                body="pong",
                # 由于所有队列默认都会绑定到匿名交换机,非常方便直接发给它,它会根据传递过来的路由键原路返回
                exchange="",
                routing_key=header.reply_to
        )
        return
    # 作为指定队列消费者
    channel.basic_consume(
            api_request_ping,
            queue="ping",
            # 必须确认后再接收后续消息
            no_ack=False,
            consumer_tag="ping_request"
    )
    # 循环调用回调函数接收处理消息
    channel.start_consuming()
    channel.close()

> 生产者


#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
#
# Authors: limanman
# OsChina: http://xmdevops.blog.51cto.com/
# Purpose:
#
"""
# 说明: 导入公共模块
import pika
# 说明: 导入其它模块
if __name__ == ‘__main__‘:
    # 创建凭证对象
    credentials = pika.PlainCredentials(‘root‘, ‘qwertyuiop‘)
    # 创建参数对象
    conn_params = pika.ConnectionParameters(
        # RabbitMQ服务地址
        host=‘127.0.0.1‘,
        # RabbitMQ服务端口
        port=5672,
        # RabbitMQ登录凭证
        credentials=credentials,
        # RabbitMQ虚拟主机
        virtual_host=‘/‘
    )
    # 创建连接对象
    conn_broker = pika.BlockingConnection(conn_params)
    # 获取信道对象
    channel = conn_broker.channel()
    # 创建匿名零时响应队列,返回唯一队列名称通过header的reply_to给消费者
    queue = channel.queue_declare(
        # 如果同名队列已存在依然返回成功,否则创建
        passive=False,
        # 声明为非持久化队列
        durable=False,
        # 声明为私有队列
        exclusive=True,
        # 队列闲置会自动删除
        auto_delete=True
    )
    exchange = sys.argv[1]
    messages = sys.argv[2]
    # 创建配置对象
    msg_props = pika.BasicProperties()
    # 设置内容类型
    msg_props.content_type = ‘application/json‘
    # 将唯一响应队列名传递给消费者
    msg_props.reply_to = queue.method.queue
    # 发送消息
    channel.basic_publish(
        # 发布消息内容
        body=messages,
        # 发布到交换机
        exchange=exchange,
        # 发布信息属性
        properties=msg_props,
        # 消息路由键
        routing_key="ping"
    )
    # 响应回调
    def api_response_ping(channel, method, header, body):
        """
        channel: 和RabbitMQ通信的信道对象,如果多个信道则会关联接收到消息的订阅的信道
        method : 一个方法帧对象,携带关联订阅的消费者标记以及投递者标记
        header : AMQP消息头信息,携带可选的消息元数据,如数据类型
        body   : 实际消息内容
        """
        print ‘#{0}[{1}]>: {2}‘.format(
                # 由于此处返回的是消费者的header,所以不能使用header.reply_to而应该使用生成的随机唯一队列名
                queue.method.queue,
                method.delivery_tag,
                body,
        )
        # 发送消息确认
        channel.basic_ack(delivery_tag=method.delivery_tag)
        return
    # 作为指定队列消费者
    channel.basic_consume(
            api_response_ping,
            # 从匿名零时队列中收取消息
            queue=queue.method.queue,
            # 必须确认后再接收后续消息
            no_ack=False,
            consumer_tag="ping_response"
    )
    # 循环调用回调函数接收处理消息
    channel.start_consuming()
    channel.close()

说明: 测试非常简单,首先启动消费者python api_server.py,然后尝试执行生产者python rpc_client.py rpc ‘{"exec": "ping"}‘,查看消费者端输出

扩展: 可以轻易的通过创建队列和绑定的方式来扩展API以支持新的API方法,这样做的最大好处是任何一台服务器都无需对所有的API调用做应答,其它RPC服务器部署在同台或异台物理机器,而这一切对于用户是无感知的~


本文出自 “满满李 - 运维开发之路” 博客,请务必保留此出处http://xmdevops.blog.51cto.com/11144840/1878573

以上是关于消息队列_RabbitMQ-0004.深入RabbitMQ之分类告警/并行执行/RPC响应?的主要内容,如果未能解决你的问题,请参考以下文章

消息队列_RabbitMQ-0002.深入MQ生产者/信道/交换机/队列/消费者?

Rabbitmq 消息队列

消息队列_RabbitMQ-0003.深入RabbitMQ节点/配置/管理及日志实时化?

rabbitmq - 不会获取队列中的所有消息

第二百九十一节,RabbitMQ多设备消息队列

RabbitMQ基础知识详解