python学习之模块:xlsxwriter

Posted 阿布先生

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python学习之模块:xlsxwriter 相关的知识,希望对你有一定的参考价值。

实战训练: 读取日志文件中的时间,CPU利用率,生成直线图

输入: ********.log

代码如下:

import gzip
import os
import re
import sys
import time
import xlsxwriter


# 定义数据字典容器
data_set = {
    \'time\': [],                # 横轴时间坐标
    \'cpu_usage\': [],           # CPU使用率
    "memory_usage": [],        # 物理内存使用率
    "virtual_memory": []       # 虚拟内存使用率
}

items_set = {
    \'1\': "CPU Usage",
    \'2\': "Memory Usage",
    \'3\': \'Virtual Memory\',
}

def parse_vmstat(content):
    \'\'\'
    解析 服务器资源使用率 vmstat
    :param content:
    :return:
    \'\'\'

    # vmstat[主机名, CPU占用率, 物理内存占用率, 物理内存容量(MB), 可用物理内存(MB), {虚拟内存占有率},虚拟内存容量(MB), 可用虚拟内存(MB)]
    # I060:9-10 01:49:48.149(32499|32991)vmstat[SZX1000331725, 3, 76, 31587, 7714, 70, 35681, 10791]

    try:
        # 此处for循环的作用就是循环解析,把每一行的 时间、CPU使用率、物理内存使用率、虚拟内存使用率 都写入 全局的 date_set 数据容器
        for line in content:
            if -1 != line.find("vmstat"):
                # 此行是服务器内存使用信息
                result = re.findall(r\'\\[.*\\]\', line)
                a = ""
                values = []
                cpu = 0
                memory = 0
                virtual_memory = 0
                if len(result) > 0:
                    temp = result[0]
                    a = temp[1:-1]
                    values = a.split(\',\')
                    cpu = int(values[1].strip())
                    memory = int(values[2].strip())
                    if len(values) > 7:
                        virtual_memory = int(values[5].strip())

                # 解析时间
                pos_start = line.find(\':\')
                pos_end = line.find(".")
                time = line[pos_start + 1:pos_end]

                data_set[\'time\'].append(time)
                data_set[\'cpu_usage\'].append(cpu)
                data_set[\'memory_usage\'].append(memory)
                data_set[\'virtual_memory\'].append(virtual_memory)
    except Exception as ex:
        print("parse_vmstat report error : %s" % ex)


def write_excel_grpah(workbook, filename, item):
    sheet_name = \'\'
    if filename.find(\'.trace.\'):
        sheet_name = "sysmonitor_perf_" + filename[-14:]
    else:
        sheet_name = filename

    try:
        worksheet = workbook.add_worksheet(sheet_name)
    except Exception as ex:
        print("add worksheet failed. file name is %s, error is : %s" % (sheet_name, ex))

    # 写列名
    worksheet.write("A1", "Time")
    worksheet.write("B1", "CPU usage")
    worksheet.write("C1", "Memory usage")
    worksheet.write("D1", "Virtual Memory")

    time_range = "Time range: " + data_set[\'time\'][0] + " to " + data_set[\'time\'][len(data_set[\'time\'])-1]
    worksheet.write("G1", time_range)

    worksheet.write_column(\'A2\', data_set[\'time\'])
    worksheet.write_column(\'B2\', data_set[\'cpu_usage\'])
    worksheet.write_column(\'C2\', data_set[\'memory_usage\'])
    worksheet.write_column(\'D2\', data_set[\'virtual_memory\'])

    try:
        if \'1\' == item:
            # Create a new Chart object.
            chart_cpu = workbook.add_chart({\'type\': \'line\'})
            chart_cpu.add_series(
                {"name": "",
                 \'categories\': \'=\' + sheet_name + \'!$A$2:$A$\' + str(len(data_set[\'time\'])),
                 \'values\': \'=\' + sheet_name + \'!$B$2:$B$\' + str(len(data_set[\'cpu_usage\'])),
                 \'line\': {\'color\': "#1874CD", \'width\': 1.5}}
            )
            chart_cpu.set_x_axis({\'name\': "Time"})
            chart_cpu.set_y_axis({\'name\': "Value"})
            chart_cpu.set_title({\'name\': items_set[item]})
            worksheet.insert_chart("G3", chart_cpu, {\'x_offset\': 0, \'y_offset\': 0, \'x_scale\': 5, \'y_scale\': 2})
        elif \'2\' == item:
            # Create a new Chart object.
            chart_memory = workbook.add_chart({\'type\': \'line\'})
            chart_memory.add_series(
                {"name": "",
                 \'categories\': \'=\' + sheet_name + \'!$A$2:$A$\' + str(len(data_set[\'time\'])),
                 \'values\': \'=\' + sheet_name + \'!$C$2:$C$\' + str(len(data_set[\'memory_usage\'])),
                 \'line\': {\'color\': "#1874CD", \'width\': 1.5}}
            )
            chart_memory.set_x_axis({\'name\': "Time"})
            chart_memory.set_y_axis({\'name\': "value"})
            chart_memory.set_title({\'name\': items_set[item]})
            worksheet.insert_chart("G3", chart_memory, {\'x_offset\': 0, \'y_offset\': 0, \'x_scale\': 2, \'y_scale\': 2})
        elif \'3\' == item:
            # Create a new Chart object.
            chart_vmemory = workbook.add_chart({\'type\': \'line\'})
            chart_vmemory.add_series(
                {"name": "",
                 \'categories\': \'=\' + sheet_name + \'!$A$2:$A$\' + str(len(data_set[\'time\'])),
                 \'values\': \'=\' + sheet_name + \'!$D$2:$D$\' + str(len(data_set[\'virtual_memory\'])),
                 \'line\': {\'color\': "#1874CD", \'width\': 1.5}}
            )
            chart_vmemory.set_x_axis({\'name\': "Time"})
            chart_vmemory.set_y_axis({\'name\': "value"})
            chart_vmemory.set_title({\'name\': items_set[item]})
            worksheet.insert_chart("G3", chart_vmemory, {\'x_offset\': 0, \'y_offset\': 0, \'x_scale\': 3, \'y_scale\': 2})
    except Exception as ex:
        print("hand chart report error :" % ex)

def compress_gz(file_name):
    \'\'\'
    解压缩 gz 文件
    :param file_name:
    :return:
    \'\'\'
    # 获取文件的名称,去掉
    log_file_name = file_name.replace(".gz", "")
    # 创建gzip对象
    g_file = gzip.open(file_name, "rb")
    # gzip对象用read()打开后,写入open()建立的文件里。
    open(log_file_name, "wb+").write(g_file.read())
    # 关闭gzip对象
    g_file.close()


def get_all_logfile_list(path):
    \'\'\'
    根据传入的日志文件路径,获取该目录下所有的文件到内存(包括 .log / .txt/  .trace 等等文件)
    :param path: 传入待解析日志的路径
    :return: 改路径下所有log文件列表
    \'\'\'

    # 将输入路径中的所有 \\ 替换成 /, 这样就不会有转义字符的问题
    new_path = ""
    if -1 != path.find(\'\\\\\'):
        new_path = path.replace(\'\\\\\', \'/\')
    file_list = os.listdir(new_path)

    log_list = []

    for file in file_list:
        if file.endswith(".gz"):
            compress_gz(file)
            log_list.append(file.replace(\'.gz\', \'\'))
            continue
        if os.path.isfile(new_path + \'/\' + file):
            log_list.append(file)

    return log_list

def parse_log_sysmonitor_perf(path, item):
    \'\'\'
    根据传入的日志文件路径,查找并读取所有的 iMAP.imapsysmonitor_perf 日志文件
    :param path:
    :return:
    \'\'\'

    # 设置传入的路径为当前的路径
    os.chdir(path)

    log_list = get_all_logfile_list(path)

    sysmonitor_perf_list = []

    for log_file in log_list:
        if log_file.startswith("iMAP.imapsysmonitor_perf"):
            sysmonitor_perf_list.append(log_file)

    if os.path.isfile(\'sys_perf_graph.xlsx\'):
        os.remove(\'sys_perf_graph.xlsx\')

    workbook = xlsxwriter.Workbook(\'sys_perf_graph.xlsx\')

    output = "%4d-%02d-%02d %02d:%02d:%02d: " % time.localtime()[0:6] + "************begin :"
    print(output)

    # 循环打开文件
    for file in sysmonitor_perf_list:
        try:
            f_log = open(file, "r", encoding=\'utf-8\')
        except Exception as result:
            print(result)

        output = "%4d-%02d-%02d %02d:%02d:%02d: " % time.localtime()[0:6] + "begin to parse log file : %s" % file
        print(output)

        # 读取文件内容
        content = f_log.readlines()

        # 关闭文件
        f_log.close()

        # 解析 iMAP.imapsysmonitor_perf.trace 文件
        parse_vmstat(content)

        output = "%4d-%02d-%02d %02d:%02d:%02d: " % time.localtime()[0:6] + "begin to write excel file..."
        print(output)

        # 写数据到Excel文件并生成图表
        write_excel_grpah(workbook, file, item)

        # 清空全局变量
        data_set[\'time\'] = []
        data_set[\'cpu_usage\'] = []
        data_set[\'memory_usage\'] = []
        data_set[\'virtual_memory\'] = []

        output = "%4d-%02d-%02d %02d:%02d:%02d: " % time.localtime()[0:6] + "write data to excel file finish."
        print(output)

    workbook.close()

    output = "%4d-%02d-%02d %02d:%02d:%02d: " % time.localtime()[0:6] + "************ finish"
    print(output)


if __name__ == "__main__":
    path = input("please input log path: ")

    if os.path.exists(path):
        # 设置传入的路径为当前的路径
        os.chdir(path)
        print("the directory path is normal")
    else:
        print("the directory path does not exist")
        sys.exit()

    print("1.cpu_usage")
    print("2.memory_usage")
    print("3.virtual_memory")
    print("4.exit")
    number = input("please select : ")

    if number not in (\'1\', \'2\', \'3\', \'4\'):
        print("input wrong, exit")
        sys.exit()

    if \'4\' == number:
        sys.exit()
    else:
        parse_log_sysmonitor_perf(path, number)

 程序运行效果:

 

 

 

以上是关于python学习之模块:xlsxwriter 的主要内容,如果未能解决你的问题,请参考以下文章

Python学习之模块

Python模块学习之特殊函数 __call__ 的使用

python学习之sys模块

python学习之argparse模块

python学习之argparse模块

Python模块学习之hashlib中MD5在接口测试中的应用