Python 多处理和组合 DF
Posted
技术标签:
【中文标题】Python 多处理和组合 DF【英文标题】:Python Multi-Processing and Combing DFs 【发布时间】:2019-11-18 06:54:05 【问题描述】:我正在将一个大型数据源读入 pandas 并将其分成 3 个块。我想使用多处理,以便我可以同时为每个块完成一个分析功能。每个函数之后的输出是一个数据帧。然后我需要将这三个小数据框组合起来。
#This part creates an empty dataframe with the correct column names
d = 'ID': [''], 'Title': [''],'Organization': [''], 'PI': [''],'PI_Phone': [''], 'PI_Email': [''],
'Start_Date': [''], 'End_Date': [''],'FY': [''], 'Funding': [''], 'Abstract': [''],
'URL': [''],'Street': [''], 'City': [''],'State': [''], 'Zip': [''],'Country': ['']
data = pd.DataFrame(data=d)
def algorithm(df):
print('Alg Running')
df['Abstract'] = df['Abstract'].fillna(value='Abstract')
df['Abstract'] = df['Title'] + ' : ' + df['Abstract']
wide_net = df[df['Abstract'].str.lower().str.contains('|'.join(tissue+te_abstract+temp_abstract+tx_abstract+armi_abstract+['cell ','tissue','organ ']),na=False)]
return wide_net
def chunk1():
print('chunk1')
therange = 0
df1 = pd.read_sql(('SELECT * FROM Clean_SBIR LIMIT ,1000;').format(therange), con=conn)
return algorithm(df1)
def chunk2():
print('chunk2')
therange = 1000
df2 = pd.read_sql(('SELECT * FROM Clean_SBIR LIMIT ,1000;').format(therange), con=conn)
algorithm(df2)
def chunk3():
print('chunk3')
therange = 2000
df3 = pd.read_sql(('SELECT * FROM Clean_SBIR LIMIT ,1000;').format(therange), con=conn)
algorithm(df3)
# creating processes
p1 = multiprocessing.Process(target=chunk1())
p2 = multiprocessing.Process(target=chunk2())
p3 = multiprocessing.Process(target=chunk3())
# starting process 1
p1.start()
# starting process 2
p2.start()
# starting process 3
p3.start()
#This is where I am struggling
results = pd.concat([chunk1(),chunk2(),chunk3()])
# wait until process 1 is finished
p1.join()
# wait until process 2 is finished
p2.join()
# wait until process 3 is finished
p3.join()
print('done')
我的算法函数返回正确的数据,然后 chunk1 也返回正确的数据,但我不知道如何将它们组合起来,因为多处理正在阻碍。
【问题讨论】:
在完成所有 3 个过程后尝试连接。 即使你设法让它工作,我敢打赌,它需要更多的时间来实现单个进程......你当前的代码只是在构建它们之后忘记了数据帧,但即使它它没有,您将在每个子进程中以一个部分数据帧结束,并且仍然必须使所有子进程都可以在父进程中访问。这里多处理的基本原理是什么? 我们正在处理海量数据集(大约 6-8gb),这个是最小的。我们构建了一个遍历这些数据集的算法,但对于某些算法,它需要 24-48 小时才能运行。所以我只是想弄清楚我们如何才能减少那个时间。我认为多处理块将是一个解决方案。 【参考方案1】:上面看起来有些奇怪,可能重构如下:
from multiprocessing import Pool
SQL = 'SELECT * FROM Clean_SBIR LIMIT %s, %s'
def process_data(offset, limit):
df = pd.read_sql(SQL, conn, params=(offset, limit))
return algorithm(df)
with Pool(3) as pool:
jobs = []
limit = 1000
for offset in range(0, 3000, limit):
jobs.append((offset, limit))
final_df = pd.concat(pool.starmap(process_data, jobs))
基本上你是在不必要地复制代码,而不是从你的块处理算法返回结果。
也就是说,您可能不想做这样的事情。进程之间的所有数据都是picked
,并且是@Serge 提出的观点的一部分。
【讨论】:
如果 RAM 有限,假设不只使用 3 个(或者这 3 个会很大),多处理中的酸洗实际上是否有利于在任务之间释放内存? 我计划在有 23 GB 内存的服务器上运行它。这会是一种聪明的使用方式吗? 您将通过这种方式临时获得四个数据副本,两个在子进程中(真正的 pandas 数据帧和腌制副本),两个在父进程中(一个当它接收到数据时选择数据,当它变成数据框时选择另一个数据)。另请注意,Python 对象比磁盘上的数据大得多... Pool(3) 是做什么的? docs are here,哪一部分不清楚?以上是关于Python 多处理和组合 DF的主要内容,如果未能解决你的问题,请参考以下文章
python中的多处理-在多个进程之间共享大对象(例如pandas数据框)
Python 队列queue与多线程组合(生产者+消费者模式)