在Python中使用多进程快速处理数据
Posted change_world
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了在Python中使用多进程快速处理数据相关的知识,希望对你有一定的参考价值。
转自:https://blog.csdn.net/bryan__/article/details/78786648
数据分片:可以将数据分片处理的任务适合用多进程代码处理,核心思路是将data分片,对每一片数据处理返回结果(可能是无序的),然后合并。应用场景:多进程爬虫,类mapreduce任务。缺点是子进程会拷贝父进程所有状态,内存浪费严重。
import math from multiprocessing import Pool def run(data, index, size): # data 传入数据,index 数据分片索引,size进程数 size = math.ceil(len(data) / size) start = size * index end = (index + 1) * size if (index + 1) * size < len(data) else len(data) temp_data = data[start:end] # do something return data # 可以返回数据,在后面收集起来 processor = 40 res = [] p = Pool(processor) for i in range(processor): res.append(p.apply_async(run, args=(data, i, processor,))) print(str(i) + ‘ processor started !‘) p.close() p.join() for i in res: print(i.get()) # 使用get获得多进程处理的结果
分文件处理:当内存受限时,不能再继续使用数据分片,因为子进程会拷贝父进程的所有状态,导致内存的浪费。这时候可以考虑先把大文件分片保存到磁盘,然后del 释放掉数据,接着在多进程处理的函数里面分别读取,这样子进程就会分别读取需要处理的数据,而不会占用大量内存。
from multiprocessing import Pool import pandas as pd import math data=pd.DataFrame({‘user_id‘:[1,2,3,4],‘item_id‘:[6,7,8,9]}) users=pd.DataFrame(data[‘user_id‘].unique(),columns=[‘user_id‘]) processor=4 p=Pool(processor) l_data = len(users) size = math.ceil(l_data / processor) res = [] def run(i): data=pd.read_csv(‘../data/user_‘+str(i)+‘.csv‘) #todo return data for i in range(processor): start = size * i end = (i + 1) * size if (i + 1) * size < l_data else l_data user = users[start:end] t_data = pd.merge(data, user, on=‘user_id‘).reset_index(drop=True) t_data.to_csv(‘../data/user_‘+str(i)+‘.csv‘,index=False) print(len(t_data)) del data,l_data,users for i in range(processor): res.append(p.apply_async(run, args=(i,))) print(str(i) + ‘ processor started !‘) p.close() p.join() data = pd.concat([i.get() for i in res])
多进程数据共享:当需要修改共享的数据时,那么这个时候可以使用数据共享:
from multiprocessing import Process, Manager # 每个子进程执行的函数 # 参数中,传递了一个用于多进程之间数据共享的特殊字典 def func(i, d): d[i] = i + 100 print(d.values()) # 在主进程中创建特殊字典 m = Manager() d = m.dict() for i in range(5): # 让子进程去修改主进程的特殊字典 p = Process(target=func, args=(i, d)) p.start() p.join() ------------ [100] [100, 101] [100, 101, 102, 103] [100, 101, 102, 103] [100, 101, 102, 103, 104]
以上是关于在Python中使用多进程快速处理数据的主要内容,如果未能解决你的问题,请参考以下文章