使用pyinotify实现加强版的linux tail -f 命令,并且对日志类型的文本进行单独优化着色显示。
Posted 北风之神0509
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了使用pyinotify实现加强版的linux tail -f 命令,并且对日志类型的文本进行单独优化着色显示。相关的知识,希望对你有一定的参考价值。
tail -f命令不能自动切换切片文件,例如日志是每100M生成一个新文件,tail -f不能自动的切换文件,必须关闭然后重新运行tail -f
此篇使用pyinotify,检测文件更新,并实现tail -f以外,还能自动识别切换切片文件。而且针对日志类型的文件做了单独样式优化。
运行 ./tailf.py + 文件路径。
此文件够自动从普通文本中,对日志就行着色处理,如果不是日志类型的文件,将直接输出,不进行着色处理。
tailf.py文件的实现代码如下:
import os import sys import re import pyinotify DIRMASK = pyinotify.IN_MODIFY \\ | pyinotify.IN_ATTRIB \\ | pyinotify.IN_MOVE_SELF \\ | pyinotify.IN_CREATE class Handler(pyinotify.ProcessEvent): def __init__(self, filename): self._fh = None self._path = filename super(Handler, self).__init__() def my_init(self): try: self._fh = open(self._path, \'r\') except IOError as e: sys.stderr.write(\'open file failed, %s\' % e) else: self._fh.seek(0, 2) def process_IN_CREATE(self, event): path = self._path if path in os.path.join(event.path, event.name): if hasattr(self, \'fh\'): self._fh.close() try: self._fh = open(self._path, \'r\') except IOError as e: sys.stderr.write(\'open file failed, %s\' % e) else: self._fh.seek(0, 2) for r in self._fh.readlines(): # sys.stdout.write(r) process_line(r) def process_IN_MODIFY(self, event): path = self._path if path not in os.path.join(event.path, event.name): return if not self._fh.closed: for r in self._fh.readlines(): # sys.stdout.write(r) process_line(r) def process_IN_MOVE_SELF(self, event): path = self._path if path in os.path.join(event.path, event.name): sys.stderr.write(\'monitor file move\') def process_IN_ATTRIB(self, event): pass class Tailer(object): def __init__(self, filename): super(Tailer, self).__init__() self._path = filename self._notifier = None self._init() def __del__(self): self._notifier.stop() def _init(self): path = self._path index = path.rfind(\'/\') wm = pyinotify.WatchManager() wm.add_watch(path[:index], DIRMASK) handler = Handler(path) self._notifier = pyinotify.Notifier(wm, handler) def run(self): self.read_last_10240_word() while True: self._notifier.process_events() if self._notifier.check_events(): self._notifier.read_events() def read_last_10240_word(self): with open(self._path,\'rb\') as f: f.seek(-10240,2) for l in f.readlines(): # print(l.decode()) process_line(l.decode()) def process_line(line_str): pass #matcher = re.search(r\'\\d{4}-\\d{2}-\\d{2}.*?- (DEBUG|INFO|WARNING|ERROR|CRITICAL) -[\\s\\S]*?(File ".*?.py", line \\d*)\', line_str) matcher = re.search(r\'^\\d{4}-\\d{2}-\\d{2}.*?(DEBUG|INFO|WARNING|ERROR|CRITICAL)\', line_str) if not matcher: print(line_str) else: log_level_str = matcher.group(1) if log_level_str == \'DEBUG\': print(\'\\033[0;32m%s\\033[0m\' % line_str) elif log_level_str == \'INFO\': print(\'\\033[0;96m%s\\033[0m\' % line_str) elif log_level_str == \'WARNING\': print(\'\\033[0;33m%s\\033[0m\' % line_str) elif log_level_str == \'ERROR\': print(\'\\033[0;35m%s\\033[0m\' % line_str) elif log_level_str == \'CRITICAL\': print(\'\\033[0;31m%s\\033[0m\' % line_str) if __name__ == \'__main__\': if len(sys.argv[1:]) != 1: print(\'Usage: python tail.py </path/to/filename>\') sys.exit(0) path = sys.argv[1] Tailer(path).run()
以上是关于使用pyinotify实现加强版的linux tail -f 命令,并且对日志类型的文本进行单独优化着色显示。的主要内容,如果未能解决你的问题,请参考以下文章
pyinotify - Monitoring filesystems events with inotify on Linux.