python 导入内存消耗分析脚本
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python 导入内存消耗分析脚本相关的知识,希望对你有一定的参考价值。
import argparse
import logging
import os
import re
import subprocess
import sys
from common.utils import imputil
from stdlib_list import stdlib_list
parser = argparse.ArgumentParser()
parser.add_argument("--increment", dest="inconly", action='store_true',
help="Increment-only log will be shown.")
parser.add_argument("--sort", dest="sort", action='store_true',
help="Sort output by memmory consumption increment.")
args = parser.parse_args()
INCONLY = args.inconly
SORT_BY_CONSUMPTION = args.sort
stdlibs = set(stdlib_list('2.7'))
stdlibs.update(['requests', 'resource', 'httplib2', 'posixpath', 'yaml', 'pyexpat', 'simplejson', '_weakrefset',
'xml.etree', 'yaml.reader', 'yaml.serializer', 'pkg_resources',
'yaml.nodes', 'docopt', 'requests.exceptions'])
# WSP = '/Users/viktorognev/work/Workspace/'
WSP = '/home/scalr/'
# TODO: accept paths as CLA
SOURCE_PATHS = [
'/opt/scalarizr/embedded/lib/python2.7/site-packages/scalarizr-4.9.b561.f30ac44-py2.7.egg/',
'/opt/scalarizr/embedded/lib/python2.7/site-packages/fatmouse_agent-0.1.ecc9efd43-py2.7.egg/'
# os.path.join(WSP, 'fatmouse/agent/'),
# os.path.join(WSP, 'fatmouse/common/'),
# os.path.join(WSP, 'scalarizr/src/'),
]
REPORT_PATH = os.path.join(WSP, 'report_{}.py')
TOP_MODULES = ['scalarizr', 'agent', 'common']
# TODO: log sys.modules.keys() this into a file during scalarizr
# operation and then read MODULES from file and do MODULES = set(MODULES)
MODULES = []
log = logging.getLogger('ImpProf')
logging.basicConfig(level=logging.DEBUG)
inconly_flag = """
import sys
import logging
from memory_profiler import LogFile
sys.stdout = LogFile('increment_log', reportIncrementFlag=True)
logging.basicConfig(filename='increment_log', filemode='w+', level=logging.DEBUG)
"""
wrapper = """
from memory_profiler import profile
{inconly_flag}
@profile(precision=4)
def main():
{standard}
{external}
{internal}
if __name__ == '__main__':
main()
"""
def analyze(parse_fn):
imports = dict(
standard=set(),
external=set(),
internal=set(),
win=set(),
failed=set(),
)
imports = parse_fn(imports)
numres = sum(len(v) for k, v in imports.items())
failed = imports.pop('failed')
log.info('*' * 80)
log.info('Analysis done. {} unique import lines collected'.format(numres))
log.info('*' * 80)
log.info('Failed to import from {} lines:'.format(len(failed)))
for line in failed:
log.debug(' {}'.format(line))
log.info('*' * 80)
log.info('*' * 80)
write_profiler_script(imports, parse_fn.__name__)
def from_code(imports):
for path in SOURCE_PATHS:
log.info('Starting analysis code repository {}'.format(path))
for root, _, files in os.walk(path):
for filename in files:
f = os.path.join(root, filename)
if not os.path.isfile(f):
continue
if os.path.splitext(f)[1] != '.py':
continue
analyze_file(f, imports)
return imports
def from_module_list(imports):
for module in MODULES:
try:
import_object(module)
except ImportError:
log.error('unable to import {}'.format(module))
continue
if '.' in module:
split = os.path.splitext(module)
s = ' '.join(['from', split[0], 'import', split[1].strip('.')])
else:
s = ' '.join(['import', module])
choose_section_and_put(s, imports)
return imports
def analyze_file(path, result):
log.info('Analysing file {} '.format(path))
with open(path, 'r+') as fp:
lines = fp.readlines()
for l in lines:
l = l.strip('",\\,\n,\t, ')
line = l
if l.startswith('#'):
continue
if not (l.startswith('import ') or l.startswith('from ')):
continue
if any(['import ' not in l,
'__import__' in l,
'__future__' in l,
'gevent' in l,
'ElementC14N' in l,
'cloudpvdutil' in l, # WTF??, where does it come from?
re.search(r'from \.+ import ', l) is not None,
]):
continue
if is_importable(l):
choose_section_and_put(line, result)
else:
result['failed'].add(line)
return result
def write_profiler_script(datadict, repottype):
log.info('Writing profiler script.')
report = ''
for sectname, lines in datadict.items():
if len(lines) == 0:
datadict[sectname] = ''
continue
indent = ' '
if sectname == 'win':
indent = indent * 2
linest = '\n{}'.format(indent)
datadict[sectname] = ''.join([
linest,
'#' * 40,
'{0}# {1}{0}'.format(linest, sectname),
linest.join(sorted(lines, cmp=compare)),
])
datadict['inconly_flag'] = inconly_flag if INCONLY else ''
with open(REPORT_PATH.format(repottype), 'w+') as fp:
report = wrapper.format(**datadict)
fp.write(report)
def choose_section_and_put(s, result):
if any(['win' in s,
'noerr' in s,
'wmi' in s]):
result['win'].add(s)
return
if any(name in s for name in TOP_MODULES):
result['internal'].add(s)
return
for_std_cmp = s.split(' as ')[0].split(' import ')[0].replace(
'import ', '').replace('from ', '').strip('.').strip()
if any(j == for_std_cmp for j in stdlibs):
result['standard'].add(s)
return
result['external'].add(s)
def compare(i1, i2):
if i1.startswith('import') and i2.startswith('from'):
return -1
if i2.startswith('import') and i1.startswith('from'):
return 1
else:
return simple_compare(i1, i2)
def simple_compare(i1, i2):
if i1 == i2:
return 0
return -1 if i1 < i2 else 1
def import_object(import_str, *args, **kwds):
"""Returns an object including a module or module and class"""
try:
__import__(import_str)
return sys.modules[import_str]
except ImportError:
return imputil.import_attr(import_str)
def is_importable(s):
if 'from' in s:
s = s.replace(' import ', '.')
else:
s = s.replace('import ', '')
s = s.replace('from ', '')
s = s.split(' ')[0]
try:
import_object(s.strip('.'))
return True
except (ImportError, ValueError) as e:
log.debug('Unable to import from string {} . Reason: {}'.format(s, e))
return False
def sortkey(value):
try:
return float(value.split('MiB')[-2].strip())
except Exception:
return 0.0
if __name__ == '__main__':
analyze(from_code)
log.info('Executing memory profiler on generated script...\n\n\n')
cmd = '/opt/scalarizr/embedded/bin/python -m memory_profiler {}'.format('report_from_code.py')
subprocess.check_call(cmd, shell=True)
if SORT_BY_CONSUMPTION:
with open('increment_log', 'r+') as fp:
lines = fp.readlines()
for line in sorted(lines[1:], key=sortkey, reverse=True):
if 'import' not in line:
continue
print(line)
# analyze(from_module_list)
以上是关于python 导入内存消耗分析脚本的主要内容,如果未能解决你的问题,请参考以下文章
基于Python项目的Redis缓存消耗内存数据简单分析(附详细操作步骤)
基于Python项目的Redis缓存消耗内存数据简单分析(附详细操作步骤)