在python中,对df进行并行应用函数。
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了在python中,对df进行并行应用函数。相关的知识,希望对你有一定的参考价值。
我有一个函数,用来处理两个列表:项目和日期,该函数返回一个更新的项目列表,目前它是用apply来运行的,这在百万行的情况下不是那么有效,我想通过并行化来提高它的效率。
项目列表中的项目是按时间顺序排列的,相应的日期列表也是如此(item_list和date_list大小相同)。
这就是df。
Date item_list date_list
12/05/20 [I1,I3,I4] [10/05/20, 11/05/20, 12/05/20 ]
11/05/20 [I1,I3] [11/05/20 , 14/05/20]
这就是我想要的df
Date item_list date_list items_list_per_date
12/05/20 [I1,I3,I4] [10/05/20, 11/05/20, 12/05/20] [I1,I3]
11/05/20 [I1,I3] [11/05/20 , 14/05/20] nan
这是我的代码
def get_item_list_per_date(date, items_list, date_list):
if str(items_list)=="nan" or str(date_list)=="nan":
return np.nan
new_date_list = []
for d in list(date_list):
new_date_list.append(pd.to_datetime(d))
if (date in new_date_list) and (len(new_date_list)>1):
loc = new_date_list.index(date)
else:
return np.nan
updated_items_list = items_list[:loc]
if len(updated_items_list )==0:
return np.nan
return updated_items_list
df['items_list_per_date'] = df.progress_apply(lambda x: get_item_list_per_date(date=x['date'], items_list=x['items_list'], date_list=x['date_list']),axis=1)
我很想把它并行化,你能帮忙吗?
答案
使用:
import multiprocessing as mp
def fx(df):
def __fx(s):
date = s['Date']
date_list = s['date_list']
if date in date_list:
loc = date_list.index(date)
return s['item_list'][:loc]
else:
return np.nan
return df.apply(__fx, axis=1)
def parallel_apply(df):
dfs = filter(lambda d: not d.empty, np.array_split(df, mp.cpu_count()))
pool = mp.Pool()
per_date = pd.concat(pool.map(fx, dfs))
pool.close()
pool.join()
return per_date
df['items_list_per_date'] = parallel_apply(df)
结果:
#print(df)
Date item_list date_list items_list_per_date
12/05/20 [I1,I3,I4] [10/05/20, 11/05/20, 12/05/20] [I1,I3]
11/05/20 [I1,I3] [11/05/20 , 14/05/20] nan
以上是关于在python中,对df进行并行应用函数。的主要内容,如果未能解决你的问题,请参考以下文章