内存不够的情况下python处理大规模数据

Posted 甘木甘木

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了内存不够的情况下python处理大规模数据相关的知识,希望对你有一定的参考价值。

在机器内存不够的情况下如何处理大规模数据

背景介绍

笔者需要在一个内存为16G的远程服务器上处理几百G的数据,数据包含了几千个csv文件,所有文件存在了一个文件夹下。
目标:用python处理完所有数据,过滤出其中有用的部分。raw data中不同的文件之间有重复的数据(重复的行),最终得到的版本需要去重。(最终的去重后的部分是小于机器内存大小的)

解决方案

最直观的想法:将所有数据用pandas读取,然后通过pd.concat()合并成一个文件,再drop_duplicates(),然而数据量太大,无法读取所有数据。

优化想法:“分而治之”,将数据一部分一部分地处理,每一个部分先自己内部去重。全部数据这样一批一批的处理完后,再重复这个过程,直到只剩一个最终的文件。

具体代码

需要注意的细节:

  • 一开始我10个文件为一批的处理,结果有的时候超过内存大小,有的时候远小于内存大小,所有后来决定根据内存使用情况动态的决定多大为一批。
  • 注意python的内存处理机制,手动显式地delete 变量,然后回收内存。
import pandas as pd
import numpy as np
import os
path1 = '../csv_log_v4_07_13_1/'
files1 = [path1+item for item in os.listdir(path1)]
path2 = '../csv_log_v4_07_14/'
files2 = [path2+item for item in os.listdir(path2)]
files = files1 + files2


def parse_file(raw_data):
	"""this function to parse every file"""
	'''保护隐私 删除部分细节'''
 	omit details here for privacy concern 
	
    return cur_file
    

import os
import psutil
# 显示当前 python 程序占用的内存大小
def show_memory_info():
    pid = os.getpid()
    p = psutil.Process(pid)

    info = p.memory_full_info()
    memory = info.uss / 1024. / 1024 #MB
    return memory > 8.0*1024 # whether larger than 8 GB


import subprocess

def wc_count(file_name):
	'''count line number of a given file'''
    out = subprocess.getoutput("wc -l %s" % file_name)
    return int(out.split()[0])

all_interaction_data = pd.DataFrame(
    columns=['user_id', 'photo_id', 'duration_ms', 'playing_time', 'request_time_ms', 'realshow']
    )

from tqdm import tqdm
import gc

inter_ls = []

cnt = 1
for file in tqdm(files):
    cnt += 1
    if cnt < 151:
        continue
    if wc_count(file)>1: # whether has more than one line
        raw_data = pd.read_csv(file, sep='#####', encoding='utf-8', engine='python')
        inter_ls.append(parse_file(raw_data))
    if show_memory_info():
    	'''当内存快满了,立刻存储当下的内容,然后回收内存'''
        tmp_inter = pd.concat(inter_ls)
        tmp_inter.drop_duplicates(inplace=True)

        all_interaction_data = all_interaction_data.append(tmp_inter, ignore_index=True)
        del tmp_inter
        del inter_ls

        inter_ls = []

        all_interaction_data.drop_duplicates(inplace=True)
        all_interaction_data.to_csv(f'../immediate_data/all_rec_interaction_cnt.tsv', sep='\\t', index=False)
        del all_interaction_data

        gc.collect()

        # global all_interaction_data
        all_interaction_data = pd.DataFrame(
            columns=['user_id', 'photo_id', 'duration_ms', 'playing_time', 'request_time_ms', 'realshow']
        )

all_interaction_data.to_csv(f'../immediate_data/all_rec_interaction_cnt.tsv', sep='\\t', index=False)

以上是关于内存不够的情况下python处理大规模数据的主要内容,如果未能解决你的问题,请参考以下文章

内存不够的情况下python处理大规模数据

如何使用 Python Ray 在不耗尽内存的情况下并行处理大量数据?

『每周译Go』Golang 在大规模流处理场景下的最小化内存使用

当机器学习遇到数据量不够时,这几个Python技巧为你化解难题

当机器学习遇到数据量不够时,这几个 Python 技巧为你化解难题

大数据计算能力 CPUGPU 和 DPU 有何不同