内存不够的情况下python处理大规模数据
Posted 甘木甘木
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了内存不够的情况下python处理大规模数据相关的知识,希望对你有一定的参考价值。
在机器内存不够的情况下如何处理大规模数据
背景介绍
笔者需要在一个内存为16G的远程服务器上处理几百G的数据,数据包含了几千个csv文件,所有文件存在了一个文件夹下。
目标:用python处理完所有数据,过滤出其中有用的部分。raw data中不同的文件之间有重复的数据(重复的行),最终得到的版本需要去重。(最终的去重后的部分是小于机器内存大小的)
解决方案
最直观的想法:将所有数据用pandas读取,然后通过pd.concat()合并成一个文件,再drop_duplicates(),然而数据量太大,无法读取所有数据。
优化想法:“分而治之”,将数据一部分一部分地处理,每一个部分先自己内部去重。全部数据这样一批一批的处理完后,再重复这个过程,直到只剩一个最终的文件。
具体代码
需要注意的细节:
- 一开始我10个文件为一批的处理,结果有的时候超过内存大小,有的时候远小于内存大小,所有后来决定根据内存使用情况动态的决定多大为一批。
- 注意python的内存处理机制,手动显式地delete 变量,然后回收内存。
import pandas as pd
import numpy as np
import os
path1 = '../csv_log_v4_07_13_1/'
files1 = [path1+item for item in os.listdir(path1)]
path2 = '../csv_log_v4_07_14/'
files2 = [path2+item for item in os.listdir(path2)]
files = files1 + files2
def parse_file(raw_data):
"""this function to parse every file"""
'''保护隐私 删除部分细节'''
omit details here for privacy concern
return cur_file
import os
import psutil
# 显示当前 python 程序占用的内存大小
def show_memory_info():
pid = os.getpid()
p = psutil.Process(pid)
info = p.memory_full_info()
memory = info.uss / 1024. / 1024 #MB
return memory > 8.0*1024 # whether larger than 8 GB
import subprocess
def wc_count(file_name):
'''count line number of a given file'''
out = subprocess.getoutput("wc -l %s" % file_name)
return int(out.split()[0])
all_interaction_data = pd.DataFrame(
columns=['user_id', 'photo_id', 'duration_ms', 'playing_time', 'request_time_ms', 'realshow']
)
from tqdm import tqdm
import gc
inter_ls = []
cnt = 1
for file in tqdm(files):
cnt += 1
if cnt < 151:
continue
if wc_count(file)>1: # whether has more than one line
raw_data = pd.read_csv(file, sep='#####', encoding='utf-8', engine='python')
inter_ls.append(parse_file(raw_data))
if show_memory_info():
'''当内存快满了,立刻存储当下的内容,然后回收内存'''
tmp_inter = pd.concat(inter_ls)
tmp_inter.drop_duplicates(inplace=True)
all_interaction_data = all_interaction_data.append(tmp_inter, ignore_index=True)
del tmp_inter
del inter_ls
inter_ls = []
all_interaction_data.drop_duplicates(inplace=True)
all_interaction_data.to_csv(f'../immediate_data/all_rec_interaction_cnt.tsv', sep='\\t', index=False)
del all_interaction_data
gc.collect()
# global all_interaction_data
all_interaction_data = pd.DataFrame(
columns=['user_id', 'photo_id', 'duration_ms', 'playing_time', 'request_time_ms', 'realshow']
)
all_interaction_data.to_csv(f'../immediate_data/all_rec_interaction_cnt.tsv', sep='\\t', index=False)
以上是关于内存不够的情况下python处理大规模数据的主要内容,如果未能解决你的问题,请参考以下文章
如何使用 Python Ray 在不耗尽内存的情况下并行处理大量数据?
『每周译Go』Golang 在大规模流处理场景下的最小化内存使用
当机器学习遇到数据量不够时,这几个Python技巧为你化解难题