使用logging封装日志
Posted furichan
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了使用logging封装日志相关的知识,希望对你有一定的参考价值。
自己封装的logging,封装日志的几个组件
Logger 记录器暴露了应用程序代码直接使用的接口。
Handler 处理器将日志记录(由记录器创建)发送到适当的目标。
Filter 过滤器提供了更细粒度的功能,用于确定要输出的日志记录。
Formatter 格式器指定最终输出中日志记录的样式。
日志等级说明:
DEBUG:程序调试bug时使用
INFO:程序正常运行时使用
WARNING:程序未按预期运行时使用,但并不是错误,如:用户登录密码错误
ERROR:程序出错误时使用,如:IO操作失败
CRITICAL:特别严重的问题,导致程序不能再继续运行时使用,如:磁盘空间为空,一般很少使用
默认的是WARNING等级,当在WARNING或WARNING之上等级的才记录日志信息。 日志等级从低到高的顺序是: DEBUG < INFO < WARNING < ERROR < CRITICAL
import logging def mylog(): # 创建日志器 logger = logging.getLogger() # 设置日志级别 logger.setLevel(logging.WARNING) # 防止重复写日志,这里进行判断,若logger.handlers列表为空则添加 if not logger.handlers: # 创建控制台处理器 sh = logging.StreamHandler() # 将控制台处理器放到日志器中 logger.addHandler(sh) # 创建文件处理器 fh = logging.FileHandler(\'log4.txt\', encoding=\'utf-8\') # 将文件处理器放到日志器中 logger.addHandler(fh) # 根据格式,创建格式器 fmt1 = "%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s" formatter1 = logging.Formatter(fmt1) fmt2 = "%(asctime)s - %(filename)s - %(levelname)s: %(message)s" formatter2 = logging.Formatter(fmt2) # 给控制台处理器设置格式 sh.setFormatter(formatter1) # 给文件处理器设置格式 fh.setFormatter(formatter2) return logger mylog().debug("this is a debug msg") mylog().info("this is a info msg") mylog().warning("this is a warning msg") mylog().error("this is a error msg") mylog().critical("this is a critical msg")
控制台输出:
日志文件的输出:
python 日志封装
日志功能描述:
写python项目时,需要用到日志类,需求为:日志信息可配置,提供几种类型不同的配置,并且日志既可以写到文本也可以写到数据库中。
实现时日志类直接使用python的logging,配置信息写到配置文件logging_data.conf,并使用logging.config.fileConfig(log_config_path)加载配置。写日志到数据库参考了log4mongo-1.6.0.tar.gz的写法,地址在:https://pypi.python.org/pypi/log4mongo/ ,同时每当在数据库写日志时,同时需要插入一些额外信息,比如:projectId runningId algorithmId,所以使用了python的logging.LoggerAdapter把额外信息添加进去。
以下是编写的配置和代码:
配置文件:logging_data.conf
[loggers]
keys=root,input,output,computer
[handlers]
keys=consoleHandler,inputfileHandler,outfileHandler,computerfileHandler,mysqlHandler
[formatters]
keys=fmt
[logger_root]
level=DEBUG
handlers=consoleHandler
[logger_input]
level=DEBUG
qualname=input
handlers=consoleHandler,inputfileHandler,mysqlHandler
propagate=0
[logger_output]
level=DEBUG
qualname=output
handlers=consoleHandler,outfileHandler,mysqlHandler
propagate=0
[logger_computer]
level=DEBUG
qualname=computer
handlers=consoleHandler,computerfileHandler,mysqlHandler
propagate=0
[handler_consoleHandler]
class=StreamHandler
level=DEBUG
formatter=fmt
args=(sys.stdout,)
[handler_inputfileHandler]
class=logging.handlers.RotatingFileHandler
level=DEBUG
formatter=fmt
args=('../../logs/input.log','a',20000,5,)
[handler_outfileHandler]
class=logging.handlers.RotatingFileHandler
level=DEBUG
formatter=fmt
args=('../../logs/out.log','a',20000,5,)
[handler_computerfileHandler]
class=logging.handlers.RotatingFileHandler
level=DEBUG
formatter=fmt
args=('../../logs/computer.log','a',20000,5,)
[handler_mysqlHandler]
class=MysqlHandler.MysqlHandler
#level=WARNING
level=DEBUG
args=("10.17.87.226","root","mysql123","aiadm")
[formatter_fmt]
format=%(asctime)s - %(filename)s:%(lineno)d - %(levelname)s - %(name)s - %(message)s
datefmt=[%Y-%m-%d %H:%M:%S]
从配置中可知:有三种日志配置,分别是input,output,computer,而写日志到数据库则使用了自己编写的MysqlHandler.MysqlHandler
日志包装类 logwrapper.py
#!/usr/bin/env python
#coding=UTF-8
import logging
import logging.config
import os,sys
try:
import thread
import threading
except ImportError:
thread = None
if thread:
_lock = threading.RLock()
else:
_lock = None
def _acquireLock():
"""
Acquire the module-level lock for serializing access to shared data.
This should be released with _releaseLock().
"""
if _lock:
_lock.acquire()
def _releaseLock():
"""
Release the module-level lock acquired by calling _acquireLock().
"""
if _lock:
_lock.release()
class LoggerAdapter(logging.LoggerAdapter):
def process(self,msg,kwargs):
try:
kwargs["extra"] = dict(kwargs["extra"],**self.extra)
except:
kwargs["extra"] = self.extra
return msg,kwargs
def setExtra(self,extra):
self.extra = extra
class logwrapper:
def __init__(self):
cur_dir = os.path.dirname(os.path.abspath(__file__))
log_config_path = cur_dir + "/logging_data.conf"
sys.path.append(cur_dir)
logging.config.fileConfig(log_config_path)
self.logger_dict =
def getLogger(self,name,extra = ):
rv = None
_acquireLock()
try:
if name in self.logger_dict:
rv = self.logger_dict[name]
else:
logger = logging.getLogger(name)
rv = LoggerAdapter(logger, extra)
self.logger_dict[name] = rv
finally:
_releaseLock()
return rv
log_manager = logwrapper()
def getLogger(name=None):
return log_manager.getLogger(name)
写日志到数据库的类MysqlHander.py
import logging
import MySQLdb
import datetime
import sys
sys.path.append("..")
import data_service.mysql_context.mySQLWrap as mySQLWrap
_mysql_obj = None
class MysqlHandler(logging.Handler):
def __init__(self,host,user,passwd,db,charset="utf8",timeout=10,level=logging.NOTSET,reuse=True):
logging.Handler.__init__(self,level)
self.host = host
self.user = user
self.passwd = passwd
self.db = db
self.charset = charset
self.timeout = timeout
self.reuse = reuse
self.mysql_obj = None
self.connect_state = False
self._connect()
def _connect(self):
global _mysql_obj
if self.reuse and _mysql_obj:
self.mysql_obj = _mysql_obj
else:
self.mysql_obj = mySQLWrap.MySQLWrap()
ret = self.mysql_obj.connectDatabase(host=self.host,user=self.user,passwd=self.passwd,db=self.db,charset=self.charset,timeout=self.timeout)
if ret == False:
_mysql_obj = None
self.connect_state = False
self.printMysqlError(self.mysql_obj.getErrorStr(),"MysqlHandler mysql connect error")
else:
_mysql_obj = self.mysql_obj
self.connect_state = True
def emit(self,record):
if self.connect_state == False:
return
CRITICAL = 50
FATAL = CRITICAL
ERROR = 40
WARNING = 30
WARN = WARNING
INFO = 20
DEBUG = 10
NOTSET = 0
_level_names =
'CRITICAL': CRITICAL,
'ERROR': ERROR,
'WARN': WARNING,
'WARNING': WARNING,
'INFO': INFO,
'DEBUG': DEBUG,
'NOTSET': NOTSET,
level = _level_names[record.levelname]
log_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
log_desc = record.getMessage()
keys = set(record.__dict__)
project_id = "NULL"
running_id = "NULL"
algorithm_id = "NULL"
ext1 = ext2 = ext3 = ext4 = ext5 = ext6 = ext7 = ext8 = ""
if keys:
for key in keys:
if key == "project_id":
project_id = record.__dict__["project_id"]
elif key == "running_id":
running_id = record.__dict__["running_id"]
elif key == "Algorithm_id":
algorithm_id = record.__dict__["Algorithm_id"]
elif key == "ext1":
ext1 = record.__dict__["ext1"]
elif key == "ext2":
ext2 = record.__dict__["ext2"]
elif key == "ext3":
ext3 = record.__dict__["ext3"]
elif key == "ext4":
ext4 = record.__dict__["ext4"]
elif key == "ext5":
ext5 = record.__dict__["ext5"]
elif key == "ext6":
ext6 = record.__dict__["ext6"]
elif key == "ext7":
ext7 = record.__dict__["ext7"]
elif key == "ext8":
ext8 = record.__dict__["ext8"]
sql = '''INSERT INTO AI_ALGORITHM_LOGS(running_id,
project_id,Algorithm_id,log_time,level,log_desc,ext1,ext2,ext3,ext4,ext5,ext6,ext7,ext8)
VALUES('%s','%s','%s','%s','%d',"%s",'%s','%s','%s','%s','%s','%s','%s','%s')'''\\
%(str(running_id),str(project_id),str(algorithm_id),log_time,level,log_desc,ext1,ext2,ext3,ext4,ext5,ext6,ext7,ext8)
#print sql
ret = self.mysql_obj.exeSQLcmd(sql)
if ret == False:
self.printMysqlError(self.mysql_obj.getErrorStr(),"MysqlHandler insert mysql log error")
def printMysqlError(self,str,msg):
print "%s %s" %(msg,str)
def close(self):
self.mysql_obj.closeDatabase()
def __del__(self):
self.mysql_obj.closeDatabase()
def __exit__(self, exc_type, exc_val, exc_tb):
self.mysql_obj.closeDatabase()
其中projectId runningId algorithmId是每条日志都必须到数据库插入的数据,而ext1 -> ext8则是可能插入的一些额外信息。mySQLWrap.MySQLWrap是自己封装的使用mysqldb操作数据库的一个类。
测试使用logtest.py
#!/usr/bin/env python
#coding=UTF-8
import logwrapper
logwrapper = logwrapper.getLogger("input")
logwrapper.setExtra("project_id":2,"running_id":0,"Algorithm_id":1)
#handler = MysqlHandler.MysqlHandler(host="10.17.87.226",user="root",passwd="mysql123",db="aiadm")
#logger.addHandler(handler)
logger.info("mysql logger %d,%s",1,"hello",extra="ext3":"extrss","ext4":"extss444")
logger.info("mysql logger %d,%s",1,"hello",extra="ext1":"extrss","ext2":"extss444")
logger.info("test input info logger %d,%s",1,"hello")
logger.warning("test input warning logger %d,%s",1,"hello")
logger.error("test input logger %d,%s",1,"hello")
logger.critical("test input logger %d,%s",1,"hello")
try:
1/0
except:
logger.exception("except")
logger = logwrapper.getLogger("out")
logger.debug("test out debug logger %d,%s",1,"hello")
logger.info("test out info logger %d,%s",1,"hello")
以上是关于使用logging封装日志的主要内容,如果未能解决你的问题,请参考以下文章
API接口自动化测试框架搭建(十九)-日志模块封装(logging模块)
WindowsGUI自动化测试框架搭建-日志模块封装(logging模块)