如何监控PostgreSQL存储过程/函数代码运行?本文介绍用python+微信/邮件的方式进行报警、监控。
首先要有一张表、用于存放PostgreSQL存储过程/函数代码运行异常的信息。
处理原则:若出现异常;把“发生时间+所在的程序+原因”通过微信/邮件发给对应人员。当然发送一次即可;起到通知的效果。
一、媒介
通过什么方式进行发送内容;下面介绍微信/邮件两种方式
1、python发送微信
py_wechar.py的内容
企业微信号;大家可以到企业微信上配置
#!/usr/bin/python3
#coding=utf-8
import json
import time
import urllib.request as urllib2
options = {
\'WeiXin\': {
\'corp_id\': \'*\', #微信企业号ID
\'agent_id\': \'*\', #微信企业号应用ID
\'agent_secret\': \'*\', #微信企业号密钥
\'to_user\': \'@all\' #发送给谁
},
}
class WeiXinSendMsg:
def __init__(self, wx_conf):
self.corp_id = wx_conf.get(\'corp_id\')
self.agent_secret = wx_conf.get(\'agent_secret\')
self.agent_id = wx_conf.get(\'agent_id\')
self.to_user = wx_conf.get(\'to_user\')
self.token = self.get_token()
self.token_update_time = int(time.time())
def get_token(self):
get_token_url = \'https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid=\' + self.corp_id + \'&corpsecret=\' + self.agent_secret
token = json.loads(urllib2.urlopen(get_token_url).read().decode(\'utf-8\'))[\'access_token\']
if token:
return token
# 微信发送端的token每1800秒会更新一次
def update_token(self):
if int(time.time()) - self.token_update_time >= 1800:
self.token = self.get_token()
self.token_update_time = int(time.time())
def send_message(self, msg):
try:
self.update_token()
send_url = \'https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token=\' + self.token
send_val = {"touser":self.to_user, "toparty":"", "msgtype":"text", "agentid":self.agent_id, "text":{"content":msg}, "safe":"0"}
send_data = json.dumps(send_val, ensure_ascii=True).encode("utf-8")
send_request = urllib2.Request(send_url, send_data)
response = json.loads(urllib2.urlopen(send_request).read())
except Exception as e:
print(\'Exception WeiXin send_message:\', e)
if __name__ == \'__main__\':
WeiXin = WeiXinSendMsg(options.get(\'WeiXin\'))
WeiXin.send_message(\'hello world / 测试\')
2、python发送邮件
py_email.py的内容
#!/usr/bin/python3
#coding=utf-8
import smtplib
from email.header import Header
from email.mime.text import MIMEText
from email.utils import parseaddr, formataddr
options = {
\'Email\': {
\'smtp_server\': \'smtp.exmail.qq.com\', #邮箱服务器地址
\'from_addr\': \'monitor@qq.com\', #发送人账号
\'password\': \'123456\', #发送人密码
\'to_addr\': [\'hanbo@126.com\', \'hanbo@163.com\'], #发送给谁
}
}
class EmailSendMsg:
def __init__(self, email_conf):
self.smtp_server = email_conf.get(\'smtp_server\')
self.from_addr = email_conf.get(\'from_addr\')
self.password = email_conf.get(\'password\')
self.to_addr = email_conf.get(\'to_addr\')
# def __del__(self):
# self.server.quit()
def format_addr(self, str):
name, addr = parseaddr(str)
return formataddr(( \\
Header(name, \'utf-8\').encode(), \\
addr.encode(\'utf-8\') if isinstance(addr, unicode) else addr))
def send_msg(self, text):
try:
self.server = smtplib.SMTP(self.smtp_server, 25)
self.server.set_debuglevel(1)
self.server.login(self.from_addr, self.password)
msg = MIMEText(text, \'plain\', \'utf-8\')
msg[\'From\'] = self.format_addr(u\'监控 <%s>\' % self.from_addr)
for i in range(len(self.to_addr)):
msg[\'To\'] = self.format_addr(u\'<%s>\' % self.to_addr[i])
msg[\'Subject\'] = Header(u\'异常报警…\', \'utf-8\').encode()
self.server.sendmail(self.from_addr, self.to_addr, msg.as_string())
self.server.quit()
except Exception as e:
print \'Exception Email send_message:\', e
if __name__ == \'__main__\':
Email = EmailSendMsg(options.get(\'Email\'))
Email.send_msg(\'hello world!\')
二、python连接数据库
看这个链接可以研究下python如何连接PostgreSQL数据库
三、python报警
上面我们知道如何通过python发送微信内容、以及python连接PostgreSQL数据库。现在我们要如何获取报警时机;报警内容。
python_alert.py
#!/usr/bin/python3
import psycopg2
from config import config
from py_wechar import WeiXinSendMsg,options
def get_errors():
""" query data from the vendors table """
conn = None
try:
params = config()
WeiXin = WeiXinSendMsg(options.get(\'WeiXin\'))
conn = psycopg2.connect(**params)
cur = conn.cursor()
cur.execute("select error_time, error_desc, proc_name from adsas.tbl_error_log where deal_status = 0 order by id")
rows = cur.fetchall()
if cur.rowcount > 0 :
WeiXin.send_message("The number of parts: {}".format(cur.rowcount))
for row in rows:
# WeiXin.send_message(\'-\'*60)
# WeiXin.send_message(\'发生时间:{}\'.format(row[0]))
# WeiXin.send_message(\'错误原因:{}\'.format(row[1]))
# WeiXin.send_message(\'报警代码:{}\'.format(row[2]))
str_error=\'发生时间:{}\\n错误原因:{}\\n报警代码:{}\'.format(row[0],row[1],row[2])
WeiXin.send_message(str_error)
cur.execute("update adsas.tbl_error_log set deal_status = 1 where deal_status = 0 ")
conn.commit()
cur.close()
except (Exception, psycopg2.DatabaseError) as error:
print(error)
finally:
if conn is not None:
conn.close()
if __name__ == \'__main__\':
get_errors()
四、部署
可以通过cron/或者开源的定时任务系统进行报警;
报警信息: