python 导入内存消耗分析脚本

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python 导入内存消耗分析脚本相关的知识,希望对你有一定的参考价值。

import argparse
import logging
import os
import re
import subprocess
import sys

from common.utils import imputil

from stdlib_list import stdlib_list


parser = argparse.ArgumentParser()
parser.add_argument("--increment", dest="inconly", action='store_true',
                    help="Increment-only log will be shown.")
parser.add_argument("--sort", dest="sort", action='store_true',
                    help="Sort output by memmory consumption increment.")
args = parser.parse_args()
INCONLY = args.inconly
SORT_BY_CONSUMPTION = args.sort


stdlibs = set(stdlib_list('2.7'))
stdlibs.update(['requests', 'resource', 'httplib2', 'posixpath', 'yaml', 'pyexpat', 'simplejson', '_weakrefset',
                'xml.etree', 'yaml.reader', 'yaml.serializer', 'pkg_resources',
                'yaml.nodes', 'docopt', 'requests.exceptions'])

# WSP = '/Users/viktorognev/work/Workspace/'
WSP = '/home/scalr/'
# TODO: accept paths as CLA
SOURCE_PATHS = [
    '/opt/scalarizr/embedded/lib/python2.7/site-packages/scalarizr-4.9.b561.f30ac44-py2.7.egg/',
    '/opt/scalarizr/embedded/lib/python2.7/site-packages/fatmouse_agent-0.1.ecc9efd43-py2.7.egg/'

    # os.path.join(WSP, 'fatmouse/agent/'),
    # os.path.join(WSP, 'fatmouse/common/'),
    # os.path.join(WSP, 'scalarizr/src/'),

]
REPORT_PATH = os.path.join(WSP, 'report_{}.py')
TOP_MODULES = ['scalarizr', 'agent', 'common']

# TODO: log sys.modules.keys() this into a file during scalarizr
# operation and then read MODULES from file and do MODULES = set(MODULES)
MODULES = []


log = logging.getLogger('ImpProf')
logging.basicConfig(level=logging.DEBUG)


inconly_flag = """
import sys
import logging
from memory_profiler import LogFile
sys.stdout = LogFile('increment_log', reportIncrementFlag=True)
logging.basicConfig(filename='increment_log', filemode='w+', level=logging.DEBUG)
"""
wrapper = """
from memory_profiler import profile
{inconly_flag}

@profile(precision=4)
def main():
    {standard}
    {external}
    {internal}

if __name__ == '__main__':
    main()
"""


def analyze(parse_fn):
    imports = dict(
        standard=set(),
        external=set(),
        internal=set(),
        win=set(),
        failed=set(),
    )
    imports = parse_fn(imports)
    numres = sum(len(v) for k, v in imports.items())
    failed = imports.pop('failed')
    log.info('*' * 80)
    log.info('Analysis done. {} unique import lines collected'.format(numres))
    log.info('*' * 80)
    log.info('Failed to import from {} lines:'.format(len(failed)))
    for line in failed:
        log.debug('        {}'.format(line))
    log.info('*' * 80)
    log.info('*' * 80)
    write_profiler_script(imports, parse_fn.__name__)


def from_code(imports):
    for path in SOURCE_PATHS:
        log.info('Starting analysis code repository {}'.format(path))
        for root, _, files in os.walk(path):
            for filename in files:
                f = os.path.join(root, filename)
                if not os.path.isfile(f):
                    continue
                if os.path.splitext(f)[1] != '.py':
                    continue
                analyze_file(f, imports)
    return imports


def from_module_list(imports):
    for module in MODULES:
        try:
            import_object(module)
        except ImportError:
            log.error('unable to import {}'.format(module))
            continue
        if '.' in module:
            split = os.path.splitext(module)
            s = ' '.join(['from', split[0], 'import', split[1].strip('.')])
        else:
            s = ' '.join(['import', module])
        choose_section_and_put(s, imports)
    return imports


def analyze_file(path, result):
    log.info('Analysing file {} '.format(path))
    with open(path, 'r+') as fp:
        lines = fp.readlines()
    for l in lines:
        l = l.strip('",\\,\n,\t, ')
        line = l
        if l.startswith('#'):
            continue
        if not (l.startswith('import ') or l.startswith('from ')):
            continue
        if any(['import ' not in l,
                '__import__' in l,
                '__future__' in l,
                'gevent' in l,
                'ElementC14N' in l,
                'cloudpvdutil' in l,  # WTF??, where does it come from?
                re.search(r'from \.+ import ', l) is not None,
                ]):
            continue
        if is_importable(l):
            choose_section_and_put(line, result)
        else:
            result['failed'].add(line)

    return result


def write_profiler_script(datadict, repottype):
    log.info('Writing profiler script.')
    report = ''
    for sectname, lines in datadict.items():
        if len(lines) == 0:
            datadict[sectname] = ''
            continue
        indent = '    '
        if sectname == 'win':
            indent = indent * 2
        linest = '\n{}'.format(indent)
        datadict[sectname] = ''.join([
            linest,
            '#' * 40,
            '{0}# {1}{0}'.format(linest, sectname),
            linest.join(sorted(lines, cmp=compare)),
        ])

    datadict['inconly_flag'] = inconly_flag if INCONLY else ''
    with open(REPORT_PATH.format(repottype), 'w+') as fp:

        report = wrapper.format(**datadict)
        fp.write(report)


def choose_section_and_put(s, result):
    if any(['win' in s,
            'noerr' in s,
            'wmi' in s]):
        result['win'].add(s)
        return

    if any(name in s for name in TOP_MODULES):
        result['internal'].add(s)
        return
    for_std_cmp = s.split(' as ')[0].split(' import ')[0].replace(
        'import ', '').replace('from ', '').strip('.').strip()
    if any(j == for_std_cmp for j in stdlibs):
        result['standard'].add(s)
        return
    result['external'].add(s)


def compare(i1, i2):
    if i1.startswith('import') and i2.startswith('from'):
        return -1
    if i2.startswith('import') and i1.startswith('from'):
        return 1
    else:
        return simple_compare(i1, i2)


def simple_compare(i1, i2):
    if i1 == i2:
        return 0
    return -1 if i1 < i2 else 1


def import_object(import_str, *args, **kwds):
    """Returns an object including a module or module and class"""
    try:
        __import__(import_str)
        return sys.modules[import_str]
    except ImportError:
        return imputil.import_attr(import_str)


def is_importable(s):

    if 'from' in s:
        s = s.replace(' import ', '.')
    else:
        s = s.replace('import ', '')
    s = s.replace('from ', '')
    s = s.split(' ')[0]
    try:
        import_object(s.strip('.'))
        return True
    except (ImportError, ValueError) as e:
        log.debug('Unable to import from string {} . Reason: {}'.format(s, e))
        return False


def sortkey(value):
    try:
        return float(value.split('MiB')[-2].strip())
    except Exception:
        return 0.0


if __name__ == '__main__':
    analyze(from_code)
    log.info('Executing memory profiler on generated script...\n\n\n')
    cmd = '/opt/scalarizr/embedded/bin/python -m memory_profiler {}'.format('report_from_code.py')
    subprocess.check_call(cmd, shell=True)
    if SORT_BY_CONSUMPTION:
        with open('increment_log', 'r+') as fp:
            lines = fp.readlines()
        for line in sorted(lines[1:], key=sortkey, reverse=True):
            if 'import' not in line:
                continue
            print(line)
    # analyze(from_module_list)

以上是关于python 导入内存消耗分析脚本的主要内容,如果未能解决你的问题,请参考以下文章

基于Python项目的Redis缓存消耗内存数据简单分析(附详细操作步骤)

基于Python项目的Redis缓存消耗内存数据简单分析(附详细操作步骤)

基于Python项目的Redis缓存消耗内存数据简单分析(附详细操作步骤)

用于内存消耗测量的python睡眠不准确

如何减少python内存的消耗?

Python内存泄漏[关闭]