[嘎嘣脆]15分钟写个监控消息队列代码

Posted 捞猴

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了[嘎嘣脆]15分钟写个监控消息队列代码相关的知识,希望对你有一定的参考价值。

概述

监控体系是我们搭建互联网研发体系中不可饶过的一个环节,从监控网络,磁盘,带宽,cpu等等到具体软件各种性能指标。及时发现问题,快速解决,让我们的服务更健壮,更安全。

监控软件

常用的监控软件,比如

  • zabbix

  • nagios

  • cacti

  • promethus

还有咱们各大互联网公司自主研发的监控软件

  • 小米的Open-falcon

  • 网易的NMS

上面列举的软件不是这次分享的重点,今天分享的主题是自己动手,用python写一个监控rabbitmq的小程序。

使用场景

适合服务器数量不多,简单监控的场景

监控架构设计的原理

监控系统设计原理不外乎

  • 采集

  • 策略

  • 报警

采集

  • 数据采集方法:RabbitMQ提供的HTTP API接口能获得消息队列的各种数据,详细的可以参照

url: http://ip:15762/api/

本次想采集正在执行消息队列的数量 那么执行http://ip:15762/api/queues 会返回一个JSON

[
    {
        "memory": 13768,
        "message_stats": {            "ack": 5,            "ack_details": {                "rate": 0            },            "deliver": 5,            "deliver_details": {                "rate": 0            },            "deliver_get": 5,            "deliver_get_details": {                "rate": 0            },            "publish": 5,            "publish_details": {                "rate": 0            }        },
        "messages": 0,
        "messages_details": {            "rate": 0        },
        "messages_ready": 0,
        "messages_ready_details": {            "rate": 0        },
        "messages_unacknowledged": 0,
        "messages_unacknowledged_details": {            "rate": 0        },
        "idle_since": "2017-02-17 8:56:20",
        "consumer_utilisation": null,
        "policy": null,
        "exclusive_consumer_tag": null,
        "consumers": 0,
        "recoverable_slaves": null,
        "state": "running",
        "messages_ram": 0,
        "messages_ready_ram": 0,
        "messages_unacknowledged_ram": 0,
        "messages_persistent": 0,
        "message_bytes": 0,
        "message_bytes_ready": 0,
        "message_bytes_unacknowledged": 0,
        "message_bytes_ram": 0,
        "message_bytes_persistent": 0,
        "head_message_timestamp": null,
        "disk_reads": 0,
        "disk_writes": 0,
        "backing_queue_status": {            "mode": "default",            "q1": 0,            "q2": 0,            "delta": [                "delta",                "undefined",                0,                "undefined"            ],            "q3": 0,            "q4": 0,            "len": 0,            "target_ram_count": "infinity",            "next_seq_id": 5,            "avg_ingress_rate": 0,            "avg_egress_rate": 0,            "avg_ack_ingress_rate": 0,            "avg_ack_egress_rate": 0        },
        "name": "QUEUE_XXX",
        "vhost": "/",
        "durable": false,
        "auto_delete": false,
        "exclusive": false,
        "arguments": {},
        "node": "rabbit@6108e1863f7c"    }
]
  • 采集频率 -> 本次设定采集频率:1分钟一次

  • 采集方法:使用linux的命令定时执行, 定时任务设定方法,请参照之前分享的一篇文章:一篇足矣,用心分享,定时任务:cron命令

  • 代码片段:定时任务执行

*/1 * * * * python /laohou/monitor/rabbitmq_monitor.py
  • 采集数据代码片段

# 监控的消息队列服务器
# XXX.XXX.XXX.XXX:15672 -> 替换成消息队列IP和端口号
RABBITMQ_HOST = [    
   "http://XXX.XXX.XXX.XXX:15672/api/queues"
]

#
# 采集数据
#
def collect():    data = {}    
   for url in RABBITMQ_HOST:        
       # XXX分别替换成用户名和密码
       r = requests.get(url, auth=HTTPBasicAuth('XXX', 'XXX'))        json_obj = json.loads(r.text)        queue_list = parse_json(json_obj)        data[url] = queue_list    return data
#
# 解析Json
#
def parse_json(json_obj):    queue = {}    
   for
text in json_obj:        queue[text['name']] = int(text['messages'])    
   return queue

采集哪些指标

本次只采集正在处理中的消息队列数量

  • 解析JSON,获取messages数值

#
# 解析Json
#
def
parse_json(json_obj):
   queue = {}    
   for
text in json_obj:        queue[text['name']] = int(text['messages'])    
   return queue

策略

# 定义阀值
# QUEUE_XXX  -> 替换成您想监控的队列名,数字是阀值,超过则报警

QUEUE_LMIT = {    
   "QUEUE_XXX": 10,    
   "QUEUE_XXX_XXX"
: 10
}

#

# 执行策略
# 本实例代码超过阀值,则添加到报警队列
#
def execute(data):    police_data = []    
   for
(url, queue_list) in data.items():        
       # 采集队列数据
       if queue_list is not None:            
           for
(k, v) in queue_list.items():                
               if k in QUEUE_LMIT.keys():                    
                   if v >= QUEUE_LMIT[k]:                        police_data.append("url: " + url + "queue: " + k + " messages: " + str(v))                
               else:                    
                   if v > 15:                        police_data.append("url: " + url + "queue: " + k + " messages: " + str(v))    
           return police_data

通知

通知方式

  • 邮件 -> 本实例通知方式:邮件

  • 短信

  • 微信

# 发送人邮箱 -> 替换成你的邮箱
MAIL_USER = "XXX@XXX.com"
# 发送人密码箱 -> 替换成你的邮箱密码

MAIL_PWD = "XXX"

#
# 通知
#
def
notice(police_data):
   if police_data:        
       # 发邮件
       send_mail(police_data)
#

# 发邮件

#
def send_mail(police_data):    # 邮件标题    mail_subject = u"队列监控" + "_" + get_now()    
   # 邮件文本内容    context = []    
   if police_data:        
       for row in police_data:            context.append(row)    
       if context:            mail_context = ''.join(context)    msg = MIMEMultipart()    msg['From'] = MAIL_USER    msg['To'] = ";".join(MAIL_LIST)    msg['Subject'] = mail_subject    
   
   # 添加邮件内容
   txt = MIMEText(mail_context, 'plain', 'utf-8')    msg.attach(txt)    
   
   # 发送邮件
   smtp = smtplib.SMTP()    
   
   # 连接163 记得开启163邮箱的SMTP,不然发不了
   smtp.connect('smtp.mxhichina.com:25')    smtp.login(MAIL_USER, MAIL_PWD)    smtp.sendmail(MAIL_USER, MAIL_LIST, msg.as_string())    smtp.quit()## 获取当前时间#def get_now():    return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())

完整代码

# !/usr/bin/python
# -*- coding:utf-8 -*-
import
requestsfrom requests.auth
import HTTPBasicAuthimport json
import
smtplibimport time
from
email.mime.text import MIMEText
from
email.mime.multipart import MIMEMultipart

# 邮件通知列表
MAIL_LIST = [
   "XXX@xxx.com",            
   "XXX@xxx.com"
]

# 监控的消息队列服务器
RABBITMQ_HOST = [    
   "http://XXX.XXX.XXX.XXX:15672/api/queues"

]

# 阀值

QUEUE_LMIT = {    
   "QUEUE_XXX"
: 10,    
   "QUEUE_XXX_XXX": 10
}

# 发送人邮箱
MAIL_USER = "XXX@XXX.com"

# 发送人邮箱密码
MAIL_PWD = "XXX"

#
# 采集数据
#
def collect():    data = {}    
   for url in RABBITMQ_HOST:        r = requests.get(url, auth=HTTPBasicAuth('XXX', 'XXX'))        json_obj = json.loads(r.text)        queue_list = parse_json(json_obj)        data[url] = queue_list    
   return
data

#
# 解析JSON
#
def parse_json(json_obj):    queue = {}    
   for text in json_obj:        queue[text['name']] = int(text['messages'])    
return queue

#
# 执行策略
#
def execute(data):    police_data = []    
   for
(url, queue_list) in data.items():        
       # 采集队列数据
       if queue_list is not None:            
           for
(k, v) in queue_list.items():                
               if k in QUEUE_LMIT.keys():                    
                   if
v >= QUEUE_LMIT[k]:                        police_data.append("url: " + url + "queue: " + k + " messages: " + str(v))                
               else:                    
                   if v > 15:                        police_data.append("url: " + url + "queue: " + k + " messages: " + str(v))    
               return police_data
#
# 获取当前时间
#

def
get_now():
   return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())

#
# 通知
#
def notice(police_data):    if police_data:        
       # 发邮件
       send_mail(police_data)
#
# 发邮件
#
def send_mail(police_data):    # 邮件标题    mail_subject = u"队列监控" + "_" + get_now()    
   # 邮件文本内容
   context = []    
   if
police_data:        
       for
row in police_data:            context.append(row)    
       if
context:            mail_context = ''.join(context)    msg = MIMEMultipart()    msg['From'] = MAIL_USER    msg['To'] = ";".join(MAIL_LIST)    msg['Subject'] = mail_subject    
   
   # 添加邮件内容
   txt = MIMEText(mail_context, 'plain', 'utf-8')    msg.attach(txt)    
   
   # 发送邮件    smtp = smtplib.SMTP()    
   
   # 连接163 记得开启163邮箱的SMTP,不然发不了    smtp.connect('smtp.mxhichina.com:25')    smtp.login(MAIL_USER, MAIL_PWD)    smtp.sendmail(MAIL_USER, MAIL_LIST, msg.as_string())    smtp.quit()
   
if __name__ == "__main__":    
   # 采集数据    data = collect()    
   
   # 过策略
   police_data = execute(data)    
   
   # 通知
   notice(police_data)

总结





长按二维码,勾搭捞猴君

路漫漫兮修远,you can you up丨捞猴


分享的文章都源于之前工作中经验总结。

如有收获,请帮忙转发。谢谢!


以上是关于[嘎嘣脆]15分钟写个监控消息队列代码的主要内容,如果未能解决你的问题,请参考以下文章

近实时(15分钟内)监控GitHub敏感信息泄露,并发送告警通知

Linux 资源监控分析

linux 操作系统级别监控 TOP命令

如何使用php、html及消息队列实现订单超时自动关闭订单

rabbitmq - 不会获取队列中的所有消息

消息队列浅谈