python dingtalk钉钉群告警消息发布

Posted liwenchao1995

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python dingtalk钉钉群告警消息发布相关的知识,希望对你有一定的参考价值。

python dingtalk钉钉群告警消息发布

公司用oprator部署的prometheus,理论上时可以直接通过alertmanager的配置。通过钉钉机器人的webhook向群里发送告警信息。但是想要格式化输出格式,并且规范化就成了问题。因为每个人都有部署exporter的权限,我不能保证每个exporter下面的desc都能跟我想要的格式一样,所以我需要重新整理每个告警信息,根据告警信息规范,格式化输出告警信息

Dingtalk简介

我们可以通过webhook直接向群发起消息,但是对于一些比较复杂的场景,比如格式化输出,投票选择,发送图片处理就稍微有些麻烦,这里可以采用Dingtalk,它的原理其实就是对钉钉的消息格式做了下封装,方便我们去使用发送各种消息,这里我用python写的脚本,所以在github上找了一个python对应的Dingtalk github地址
我也fork了这位作者一份到我的github,在此给作者点个赞,详细使用说明可以看到基本,下面是我的脚本

钉钉告警消息通知脚本

我的思路是先向alert发起请求,获取到json后处理,把告警信息保存到我自己创建的数组或者字典里,再for循环处理,否则直接处理json会造成不必要的资源损耗。再追加生成新的告警信息,发送到钉钉群。

#!/usr/bin/python
# -*- coding:utf-8 -*-
# BY:wenchao.Li time: 2021/10/8
from collections import namedtuple
import requests
import json
from dingtalkchatbot.chatbot import DingtalkChatbot, ActionCard, CardItem
from apscheduler.schedulers.background import BackgroundScheduler
from flask import Flask
import atexit
import time

#定义发生严重告警的字典
disk_warn_list=[]
alive_warn_list=[]
cpu_warn_list=[]
memory_warn_list=[]

app = Flask(__name__)

def GetJsonFromAlert():
  # Alertmanager API 地址
  url = "http://liwenchao1995:301230/api/v2/alerts"
  # 发送请求
  response = requests.get(url)
  # 解析响应
  if response.status_code == 200:
     json_data = json.loads(response.text)   
     length_json=len(json_data) 
     init_len=0
     
     for init_len in range(length_json): 
       try:
	     #判断json的告警信息里是不是以idc-zgc-exporter开头,如果是,就获取它的一些信息,把这些信息存储到字典里,再把字典当作一个元素存储到列表里。
         if json_data[init_len]["labels"]["job"].lower().startswith("idc-zgc-exporter"):
           #定义存储到列表中告警的信息
           # print(GetInfoFromLable(json_data,init_len,"job"))
            alertname=GetInfoFromLable(json_data,init_len,"alertname")
            instance=GetInfoFromLable(json_data,init_len,"instance")
            severity=GetInfoFromLable(json_data,init_len,"severity")
            if alertname == "HighDiskUsage":
               disk_warn_list.append("alertname":alertname,"instance":instance,"severity":severity,"message":"磁盘告警") 
            if alertname == "InstanceDown":
               alive_warn_list.append("alertname":alertname,"instance":instance,"severity":severity,"message":"主机存活告警") 
            if alertname == "HighCpuUsage":
               cpu_warn_list.append("alertname":alertname,"instance":instance,"severity":severity,"message":"Cpu告警") 
            if alertname == "HighMemoryUsage":
               memory_warn_list.append("alertname":alertname,"instance":instance,"severity":severity,"message":"内存告警") 
         else:
            pass
       except KeyError:
         pass
  else:
    print("Failed to connected alerts:", response.text)

def GetInfoFromLable(json_data,init_len,key):
  return json_data[init_len]["labels"][key]

#发起告警信息
def TalkToDingding():
  GetJsonFromAlert()
  webhook = \'https://oapi.dingtalk.com/robot/send?access_token=1231318631d30103da dadasdasdadf1dc62c4870dasda
  xiaoding = DingtalkChatbot(webhook)  # 方式一:通常初始化方式
  # Text消息@所有人
  # xiaoding.send_text(msg=\'dingtalk test warning\', is_at_all=True)
  
  #定义tuple,方便下面调用参数,防止参数过长不美观
  warninfo = namedtuple(\'warninfo\', [\'lever\', \'number\', \'lists\'])
  disk_warn = warninfo("circtl", len(disk_warn_list), GetDictValueInList(disk_warn_list))
  alive_warn = warninfo("circtl", len(alive_warn_list), GetDictValueInList(alive_warn_list))
  cpu_warn = warninfo("circtl", len(cpu_warn_list), GetDictValueInList(cpu_warn_list))
  memory_warn = warninfo("circtl", len(memory_warn_list), GetDictValueInList(memory_warn_list))
  text = Build_Warn_Text(alert_text,disk_warn,alive_warn,cpu_warn,memory_warn)
  xiaoding.send_markdown(title=\'监控报警通知\\n\', text=text, at_mobiles=["18201212313","1712307676"]) 

#定义初始化告警信息,这里加了一张我自己制作的图片
alert_text = \'> ![告警](https://img2023.cnblogs.com/blog/2654447/202304/2654447-20230428140432929-998719694.png)\\n\' \\
       \'> #### [本地监控] 发生告警\\n\'

#构建告警信息
def Build_Warn_Text(alert_text,disk_warn,alive_warn,cpu_warn,memory_warn):
  if alive_warn.number != 0:
    # 主机存活告警部分
    alert_text += \'> ### 主机存活告警\\n\' \\
           \'> * 告警级别: ``\\n\' \\
           \'> * 告警条数: ``\\n\' \\
           \'> * 告警列表: ``\\n\' \\
           \'>.\\n\'.format(alive_warn.lever, alive_warn.number, alive_warn.lists)
  if disk_warn.number != 0:
  # 磁盘告警部分
    alert_text += \'> ### 磁盘告警\\n\' \\
           \'> * 告警级别: ``\\n\' \\
           \'> * 告警条数: ``\\n\' \\
           \'> * 告警列表: ``\\n\' \\
           \'>.\\n\'.format(disk_warn.lever, disk_warn.number, disk_warn.lists)
  if cpu_warn.number != 0:
    # CPU告警部分
    alert_text += \'> ### CPU告警\\n\' \\
           \'> * 告警级别: ``\\n\' \\
           \'> * 告警条数: ``\\n\' \\
           \'> * 告警列表: ``\\n\' \\
           \'>.\\n\'.format(cpu_warn.lever, cpu_warn.number, cpu_warn.lists)
  if memory_warn.number != 0:
    # 内存告警部分
     alert_text += \'> ### 内存告警\\n\' \\
           \'> * 告警级别: ``\\n\' \\
           \'> * 告警条数: ``\\n\' \\
           \'> * 告警列表: ``\\n\' \\
           \'>.\\n\'.format(memory_warn.lever, memory_warn.number, memory_warn.lists)
  return alert_text

def GetDictValueInList(listname):
  list_init=[]
  for dicts in listname:
      warn_host_list=dicts[\'instance\']
      list_init.append(warn_host_list)
  return list_init

#添加TalkToDingding到定时任务,每30秒运行一次
scheduler = BackgroundScheduler()
scheduler.add_job(func=TalkToDingding, trigger=\'interval\', seconds=10)
#scheduler.add_job(func=TalkToDingding, trigger=\'interval\', minutes=30)
scheduler.start() 
atexit.register(lambda: scheduler.shutdown())
  
#flask框架,后台运行
if __name__ == \'__main__\':
    app.run()

上面脚本还有些瑕疵,比如告警级别我没获取直接写死了,后面有时间会再优化

Shell中通过机器人发送钉钉群消息

场景:在平时的工作中,经常需要执行一些定时任务,并且当定时任务执行失败后需要告警出来到钉钉群,使用[钉钉机器人]可以简单快速的帮我们完成这件事情。


  • 简单使用



curl 'https://oapi.dingtalk.com/robot/send?access_token=9e2549f6a0bb28767bdexxxxxxxxxxxxxxxxxx81975c981005abbd'    -H 'Content-Type: application/json'    -d '
  {"msgtype": "text", 
    "text": {
        "content": "我就是我, 是不一样的烟火"
     }
  }'

效果图:

技术分享图片



  • 发送文本内容并@指定的人


curl 'https://oapi.dingtalk.com/robot/send?access_token=9e2549f6a0xxxxxxxxxxxxxxxxxx447981975c981005abbd' -H 'Content-Type: application/json' -d '{"msgtype":"text","text":{"content":"我就是我, 是不一样的烟火"},"at":{"atMobiles":["176xxxx1234"],"isAtAll":false}}'

效果图:

技术分享图片

以上是关于python dingtalk钉钉群告警消息发布的主要内容,如果未能解决你的问题,请参考以下文章

钉钉群机器人通知中的图片,突然不显示了

通过钉钉群聊机器人推送zabbix告警

python3 钉钉 加签名 钉钉群机器人巡检告警 脚本

python3 钉钉 加签名 钉钉群机器人巡检告警 脚本

python3 钉钉 加签名 钉钉群机器人巡检告警 脚本

DingTalk - 如何设置钉钉群直播静音?