正确使用多处理
Posted
技术标签:
【中文标题】正确使用多处理【英文标题】:Correctly use multiprocessing 【发布时间】:2021-10-23 00:06:18 【问题描述】:我第一次尝试使用多处理并且必须提取约 500,000 条记录(现在我将变量设置为 500)。原始循环会花费很长时间,所以我正在尝试多处理。现在我有 10 个进程正在运行,并且它可以工作,但仍然需要大约 4 个小时才能完成。我想运行 20 个左右的进程,但我担心我的计算机可能存在性能问题,我不想在早上醒来看到程序崩溃。我是正确使用它还是有更好的方法?
完整代码:
from pyETT import ett_parser
import pandas as pd
import time
from datetime import datetime
from multiprocessing import Process
import sys
c = 10
x1,y1 = 1,50
x2,y2 = 51,100
x3,y3 = 101,150
x4,y4 = 151,200
x5,y5 = 201,250
x6,y6 = 251,300
x7,y7 = 301,350
x8,y8 = 351,400
x9,y9 = 401,450
x10,y10 = 451,500
m_cols = ('user-name','elo','rank','wins','losses','last-online')
def run1():
print('Running query 1...')
df_master = pd.DataFrame(columns = m_cols)
for i in range(x1, y1):
try:
if int(i) % int(c) == 0:
print('Loop1 is at:', i)
user_id = i
line = ett.ett_parser.get_user(user_id)
temp_df = pd.DataFrame(line, index=[i])
df_master = df_master.append(temp_df, ignore_index = True)
except Exception:
print("Error_1:",i )
#Export to excel
file_name = 'export_file1_' + datetime.now().strftime("%H_%M_%S") + '.xlsx'
df_master.to_excel(file_name, index = False)
print('DataFrame(' + file_name + ') is written to Excel File successfully.')
def run2():
print('Running query2...')
m_cols = ('user-name','elo','rank','wins','losses','last-online')
df_master = pd.DataFrame(columns = m_cols)
for i in range(x2, y2):
try:
if int(i) % int(c) == 0:
print('Loop2 is at:', i)
user_id = i
line = ett.ett_parser.get_user(user_id)
temp_df = pd.DataFrame(line, index=[i])
df_master = df_master.append(temp_df, ignore_index = True)
except Exception:
print("Error_2:",i )
#Export to excel
file_name = 'export_file2_' + datetime.now().strftime("%H_%M_%S") + '.xlsx'
df_master.to_excel(file_name, index = False)
print('DataFrame(' + file_name + ') is written to Excel File successfully.')
def run3():
print('Running query3...')
df_master = pd.DataFrame(columns = m_cols)
for i in range(x3, y3):
try:
if int(i) % int(c) == 0:
print('Loop3 is at:', i)
user_id = i
line = ett.ett_parser.get_user(user_id)
temp_df = pd.DataFrame(line, index=[i])
df_master = df_master.append(temp_df, ignore_index = True)
except Exception:
print("Error_3:",i )
#Export to excel
file_name = 'export_file3_' + datetime.now().strftime("%H_%M_%S") + '.xlsx'
df_master.to_excel(file_name, index = False)
print('DataFrame(' + file_name + ') is written to Excel File successfully.')
def run4():
print('Running query4...')
df_master = pd.DataFrame(columns = m_cols)
for i in range(x4, y4):
try:
if int(i) % int(c) == 0:
print('Loop4 is at:', i)
user_id = i
line = ett.ett_parser.get_user(user_id)
temp_df = pd.DataFrame(line, index=[i])
df_master = df_master.append(temp_df, ignore_index = True)
except Exception:
print("Error_4:",i )
#Export to excel
file_name = 'export_file4_' + datetime.now().strftime("%H_%M_%S") + '.xlsx'
df_master.to_excel(file_name, index = False)
print('DataFrame(' + file_name + ') is written to Excel File successfully.')
def run5():
print('Running query5...')
df_master = pd.DataFrame(columns = m_cols)
for i in range(x5, y5):
try:
if int(i) % int(c) == 0:
print('Loop5 is at:', i)
user_id = i
line = ett.ett_parser.get_user(user_id)
temp_df = pd.DataFrame(line, index=[i])
df_master = df_master.append(temp_df, ignore_index = True)
except Exception:
print("Error_5:",i )
#Export to excel
file_name = 'export_file5_' + datetime.now().strftime("%H_%M_%S") + '.xlsx'
df_master.to_excel(file_name, index = False)
print('DataFrame(' + file_name + ') is written to Excel File successfully.')
def run6():
print('Running query6...')
df_master = pd.DataFrame(columns = m_cols)
for i in range(x6, y6):
try:
if int(i) % int(c) == 0:
print('Loop6 is at:', i)
user_id = i
line = ett.ett_parser.get_user(user_id)
temp_df = pd.DataFrame(line, index=[i])
df_master = df_master.append(temp_df, ignore_index = True)
except Exception:
print("Error_6:",i )
#Export to excel
file_name = 'export_file6_' + datetime.now().strftime("%H_%M_%S") + '.xlsx'
df_master.to_excel(file_name, index = False)
print('DataFrame(' + file_name + ') is written to Excel File successfully.')
def run7():
print('Running query7...')
df_master = pd.DataFrame(columns = m_cols)
for i in range(x7, y7):
try:
if int(i) % int(c) == 0:
print('Loop7 is at:', i)
user_id = i
line = ett.ett_parser.get_user(user_id)
temp_df = pd.DataFrame(line, index=[i])
df_master = df_master.append(temp_df, ignore_index = True)
except Exception:
print("Error_7:",i )
#Export to excel
file_name = 'export_file7_' + datetime.now().strftime("%H_%M_%S") + '.xlsx'
df_master.to_excel(file_name, index = False)
print('DataFrame(' + file_name + ') is written to Excel File successfully.')
def run8():
print('Running query8...')
df_master = pd.DataFrame(columns = m_cols)
for i in range(x8, y8):
try:
if int(i) % int(c) == 0:
print('Loop8 is at:', i)
user_id = i
line = ett.ett_parser.get_user(user_id)
temp_df = pd.DataFrame(line, index=[i])
df_master = df_master.append(temp_df, ignore_index = True)
except Exception:
print("Error_8:",i )
#Export to excel
file_name = 'export_file8_' + datetime.now().strftime("%H_%M_%S") + '.xlsx'
df_master.to_excel(file_name, index = False)
print('DataFrame(' + file_name + ') is written to Excel File successfully.')
def run9():
print('Running query9...')
df_master = pd.DataFrame(columns = m_cols)
for i in range(x9, y9):
try:
if int(i) % int(c) == 0:
print('Loop9 is at:', i)
user_id = i
line = ett.ett_parser.get_user(user_id)
temp_df = pd.DataFrame(line, index=[i])
df_master = df_master.append(temp_df, ignore_index = True)
except Exception:
print("Error_9:",i )
#Export to excel
file_name = 'export_file9_' + datetime.now().strftime("%H_%M_%S") + '.xlsx'
df_master.to_excel(file_name, index = False)
print('DataFrame(' + file_name + ') is written to Excel File successfully.')
def run10():
print('Running query10...')
df_master = pd.DataFrame(columns = m_cols)
for i in range(x10, y10):
try:
if int(i) % int(c) == 0:
print('Loop10 is at:', i)
user_id = i
line = ett.ett_parser.get_user(user_id)
temp_df = pd.DataFrame(line, index=[i])
df_master = df_master.append(temp_df, ignore_index = True)
except Exception:
print("Error_10:",i )
#Export to excel
file_name = 'export_file10_' + datetime.now().strftime("%H_%M_%S") + '.xlsx'
df_master.to_excel(file_name, index = False)
print('DataFrame(' + file_name + ') is written to Excel File successfully.')
def main():
p = Process(target=run1)
p.start()
#p.join()
p2 = Process(target=run2)
p2.start()
p3 = Process(target=run3)
p3.start()
p4 = Process(target=run4)
p4.start()
p5 = Process(target=run5)
p5.start()
p6 = Process(target=run6)
p6.start()
p7 = Process(target=run7)
p7.start()
p8 = Process(target=run8)
p8.start()
p9 = Process(target=run9)
p9.start()
p10 = Process(target=run10)
p10.start()
p10.join()
if __name__ == '__main__':
start = time.time()
print('starting main')
main()
print('finishing main',time.time()-start)
更新代码
使用 swaggg 的答案,这段代码可以满足我的要求,而且要短得多。
from concurrent.futures import ThreadPoolExecutor
from multiprocessing import cpu_count
from pyETT import ett_parser
import pandas as pd
import time
def main():
USER_ID_COUNT = 50
MAX_WORKERS = 2 * cpu_count() + 1
dataframe_list = []
#user_array = []
user_ids = list(range(1, USER_ID_COUNT))
def obtain_user_record(user_id):
return ett_parser.get_user(user_id)
with ThreadPoolExecutor(MAX_WORKERS) as executor:
for user_id, user_record in zip(user_ids, executor.map(obtain_user_record, user_ids)):
if user_record:
dataframe_list.append(user_record)
df_master = pd.DataFrame.from_dict(dataframe_list,orient='columns')
print(df_master)
if __name__ == '__main__':
start = time.time()
print('starting main')
main()
print('finishing main', time.time() - start)
【问题讨论】:
【参考方案1】:只需使用 multiprocessing.Pool,它默认使用您的所有内核,不需要任何复杂的管理即可使其以 100% 的效率运行。产生太多的进程不是要走的路。
它还可以让你的代码更加整洁,因为你只需要提供一个函数和一个数据映射。
还有:
您可以使用time.strftime("%H_%M_%S")
代替datetime.now().strftime("%H_%M_%S")
。日期时间模块真的很慢。
如果您使用原语而不是 pandas,尤其是仅使用基本的 Python open 结合 string.split 或不太理想的 Python xls.reader/writer,您将看到性能的巨大提升。熊猫真的很慢。在很多情况下,它的有用特性弥补了其缓慢的性能,但是如果您唯一需要做的就是从 csv/etc 文件中提取一些数据并且您正在处理一个庞大的数据集,那么您最好不要使用它。
编辑: 我查看了您使用的实际模块,它涉及发送请求,而我认为您正在从磁盘读取数据。
顺便说一句,如果我错了,请纠正我,但我认为 ett.ett_parser.get_user(user_id)
中的 ett.
是错误的,实际调用应该只是到 ett_parser
。
因为您的代码涉及通过网络发送多个请求,然后将数据写入磁盘,您可能应该使用多线程而不是多处理,或者您仍然可以使用多处理,但尝试不同数量的进程,因为在这种情况下该任务受 I/O 限制,您应该尝试发送比 CPU 中的逻辑内核更多的请求。同时,Pandas 就像我说的那样很慢,而您的另一个选择是使用 xlsxwriter 模块,它可以保证更快(因为您实际上只使用 pandas 将从您的 json 请求中获得的字符串写入 .xls文件。)如果您对 csv 格式没问题,您也可以尝试使用 .csv 编写器模块。但最重要的是,主要瓶颈将是请求,而不是将数据写入磁盘所需的处理能力。
其次,Booboo 的解决方案会调整您的代码以与池一起使用,这非常棒。但是不将数据分成块然后在进程之间划分这些块可能更方便。相反,由于请求是瓶颈,您可以使用多线程并将结果附加到全局数组。如果您只使用多线程映射,则只需将用户 ID 和函数传递给它。因此,您可以执行以下操作:
from concurrent.futures import ThreadPoolExecutor
from multiprocessing import cpu_count
from pyETT import ett_parser
USER_ID_COUNT = 50000
MAX_WORKERS = 2 * cpu_count() + 1
user_array = [] #Can be adapted to Pandas or the xlwriter module
user_ids = list(range(1, USER_ID_COUNT))
def obtain_user_record(user_id):
return ett_parser.get_user(user_id)
with ThreadPoolExecutor(MAX_WORKERS) as executor:
for user_id, user_record in zip(user_ids, executor.map(obtain_user_record, user_ids)):
user_array.append(user_record)
然后,您可以将数组写入磁盘,随意拆分,如果您觉得值得,您当然可以使用多处理。
EDIT2:
好的,在这种情况下使用 Pandas。但是,请注意 pandas.DataFrame.append 状态的文档页面:
迭代地将行附加到 DataFrame 比单个连接的计算量更大。更好的解决方案是将这些行附加到列表中,然后将列表与原始 DataFrame 一次性连接起来。
哦,顺便说一句,我认为 pyETT 函数返回的是数据帧而不是字符串,对吗?我不太确定返回值的索引,但你可以弄清楚。这样做:
dataframe_list = []
with ThreadPoolExecutor(MAX_WORKERS) as executor:
for user_id, user_record in zip(user_ids, executor.map(obtain_user_record, user_ids)):
if user_record:
dataframe_list.append(user_record)
#If the dataframes aren't indexed, you could create a pre-filled
#data_frame_list and overwrite specific indices instead of appending,
#then pass ignore_index=True to pd.concat
df_master = pd.concat(dataframe_list) #We're done, time to merge our dframes
#into one big dframe.
编辑3: 好的,那么我们需要将数据转换为 pandas 数据帧。
user_df = pd.DataFrame.from_dict(user_id:user_record, orient='index', columns=m_cols)
dataframe_list.append(user_df)
这样,帧将由我们之前传递给函数的用户 ID 索引。当然,您可以有多个列表并将它们合并到多个数据帧中,然后只需将数据帧写入单独的 .xls 文件,或者您可以采用其他方法。
编辑4: 有趣的是,它会让您在字典列表上使用 pd.DataFrame.from_dict。要调用的正确函数应该是 pd.DataFrame.from_records 。您实际上并不需要将 orient='index' 传递给它。将字典转换为数据框,然后合并它们有什么好处?好吧,你可以像我在 Edit3 中展示的那样控制索引。您也可以使用
pd.DataFrame.from_records(dataframe_list,index=index_list)
df_master.sort_index()
Index_list 是一个索引列表(即 user_ids),与 dataframe_list 的顺序相同。
你也可以用字典代替列表:
dataframe_dict =
with ThreadPoolExecutor(MAX_WORKERS) as executor:
for user_id, user_record in zip(user_ids, executor.map(obtain_user_record, user_ids)):
if user_record:
dataframe_dict[user_id] = user_record
然后调用的正确函数是 from_dict ,您可以再次索引记录,如下所示:
df_master = pd.DataFrame.from_dict(dataframe_dict, orient='index')
df_master.sort_index()
不必每次都将 dicts 转换为数据帧在性能方面更好,所以很好的调用。
【讨论】:
这对我有用,但我正在努力将数组转换为数据框(对 xlswriter 不太熟悉)。 这个特殊的函数返回一个字典对象。print(type(user_record),user_record)
返回:df_master = pd.concat(dataframe_list)
Message=cannot concatenate object of type 'df_master = pd.DataFrame.from_dict(dataframe_list,orient='columns')
的末尾来使其工作。使用这个和你发布的那个有什么好处吗?
你的意思是 pd.DataFrame.from_dict(user_record,orient='columns') ????
这行不通,根据您发布的字典。您不能从列表中创建字典。【参考方案2】:
更新
首先,当您处理range(1, 50)
、range(51, 100)
、range(101, 150)
等范围时,user_id
将采用的实际值为:
1, 2, ... 49, 51, 52, ... 99, 101, 102, ... 149
也就是说,您不会处理值 50、100、150 等,因为给定 range(start, stop, step)
然后对于正步长,范围 r 的内容由公式 r[i] = start + step 确定*i where i >= 0 and r[i]
我怀疑您是否有意忽略这些值。我会假设这不是你的意图。
处理似乎可以分为 I/O 绑定部分(ett.ett_parser.get_user(user_id)
,它正在检索一个 URL)和一个 更多 CPU 密集部分(创建 pandas
dataframe
并导出到 Excel)。前者应使用大型多线程池,后者应使用多处理池。
以下代码创建两个池并将多处理池传递给工作函数run
,该函数检索其传递范围内指定的所有 URL,然后将检索到的行列表传递给多处理池进行组装和转换Excel。
from pyETT import ett_parser
import pandas as pd
import time
from datetime import datetime
from multiprocessing import Pool
from multiprocessing.pool import ThreadPool
from functools import partial
import sys
def run(multiprocessing_pool, run_number, the_range):
c = 10
print(f'Running query run_number...', flush=True)
lines = []
for user_id in the_range:
try:
if user_id % c == 0:
print('Loop is at:', user_id, flush=True)
line = ett.ett_parser.get_user(user_id)
lines.append((user_id, line))
except Exception:
print("Error:", user_id, flush=True)
if lines:
multiprocessing_pool.apply(process_lines, args=(run_number, lines))
def process_lines(run_number, lines):
m_cols = ('user-name','elo','rank','wins','losses','last-online')
df_master = pd.DataFrame(columns = m_cols)
for user_id, line in lines:
temp_df = pd.DataFrame(line, index=[user_id])
df_master = df_master.append(temp_df, ignore_index=True)
#Export to excel
file_name = f'export_filerun_number_datetime.now().strftime("%H_%M_%S").xlsx'
df_master.to_excel(file_name, index=False)
print(f'DataFrame(file_name) is written to Excel File successfully.', flush=True)
def split(lst, n): # function to split list in n even parts
k, m = divmod(len(lst), n)
return list(lst[i * k + min(i, m):(i + 1) * k + min(i + 1, m)] for i in range(n))
def main():
MIN_USER_ID = 1
MAX_USER_ID = 500
NUM_SPLITS = 10 # 50 user ids per split
THREAD_POOL_SIZE = min(NUM_SPLITS, 100) # Or maybe even 500!
# split this range into thread_pool_size smaller ranges:
# Next line will process 500 user ids: 1, 2, ... 500
ranges = split(range(MIN_USER_ID, MAX_USER_ID+1), NUM_SPLITS)
multiprocessing_pool = Pool()
multithreading_pool = ThreadPool(THREAD_POOL_SIZE)
multithreading_pool.starmap(partial(run, multiprocessing_pool) , enumerate(ranges, start=1))
if __name__ == '__main__':
start = time.time()
print('starting main')
main()
print('finishing main', time.time() - start)
【讨论】:
将 cpu_count() 的输出传递给 Pool 是多余的,因为这是其参数采用的默认值。 @swaggg 感谢您的评论。这里强调指出进程数必须等于ranges
的长度。在这种情况下,num_processors
被设置为 cpu_count()
,这是我们拥有的 logical 核心的数量。但我们可能会确定将num_processors
设置为物理 内核的数量实际上会使程序运行得同样快甚至更快。在这种情况下,不将num_processors
显式传递给Pool
构造函数将是一个巨大的错误。换句话说,这是小心编码。变量的另一个名称可能是n_processes
。
你的方法很有趣。我查看了 pyETT 模块的功能,得出的结论是 OP 需要的进程比他拥有的逻辑核心要多得多。
@swaggg 你是对的。查看更新的答案。
很抱歉。我不得不修改一行代码并更新了答案。以上是关于正确使用多处理的主要内容,如果未能解决你的问题,请参考以下文章
Python 多处理正确完成工作,但进程仍然存在(Linux)
如何正确使用 Core Data 进行多线程处理? [关闭]
Spacy,在 python 中的大型数据集上使用 nlp.pipe,多处理导致进程进入睡眠状态。如何正确使用所有 CPU 内核?