正确使用多处理

Posted

技术标签:

【中文标题】正确使用多处理【英文标题】:Correctly use multiprocessing 【发布时间】:2021-10-23 00:06:18 【问题描述】:

我第一次尝试使用多处理并且必须提取约 500,000 条记录(现在我将变量设置为 500)。原始循环会花费很长时间,所以我正在尝试多处理。现在我有 10 个进程正在运行,并且它可以工作,但仍然需要大约 4 个小时才能完成。我想运行 20 个左右的进程,但我担心我的计算机可能存在性能问题,我不想在早上醒来看到程序崩溃。我是正确使用它还是有更好的方法?

完整代码:

from pyETT import ett_parser
import pandas as pd
import time
from datetime import datetime
from multiprocessing import Process
import sys

c = 10
x1,y1 = 1,50
x2,y2 = 51,100
x3,y3 = 101,150
x4,y4 = 151,200
x5,y5 = 201,250
x6,y6 = 251,300
x7,y7 = 301,350
x8,y8 = 351,400
x9,y9 = 401,450
x10,y10 = 451,500
m_cols = ('user-name','elo','rank','wins','losses','last-online')

def run1():
    print('Running query 1...')
    df_master = pd.DataFrame(columns = m_cols)

    for i in range(x1, y1):
        try:
            if int(i) % int(c) == 0:
                print('Loop1 is at:', i)
            
            user_id = i
            line = ett.ett_parser.get_user(user_id)
            temp_df = pd.DataFrame(line, index=[i])
            df_master = df_master.append(temp_df, ignore_index = True)
        except Exception:
            print("Error_1:",i )

    #Export to excel
    file_name = 'export_file1_' + datetime.now().strftime("%H_%M_%S") + '.xlsx'
    df_master.to_excel(file_name, index = False)
    print('DataFrame(' + file_name + ') is written to Excel File successfully.')

def run2():
    print('Running query2...')
    m_cols = ('user-name','elo','rank','wins','losses','last-online')
    df_master = pd.DataFrame(columns = m_cols)

    for i in range(x2, y2):
        try:
            if int(i) % int(c) == 0:
                print('Loop2 is at:', i)

            user_id = i
            line = ett.ett_parser.get_user(user_id)
            temp_df = pd.DataFrame(line, index=[i])
            df_master = df_master.append(temp_df, ignore_index = True)
        except Exception:
            print("Error_2:",i )

    #Export to excel
    file_name = 'export_file2_' + datetime.now().strftime("%H_%M_%S") + '.xlsx'
    df_master.to_excel(file_name, index = False)
    print('DataFrame(' + file_name + ') is written to Excel File successfully.')


def run3():
    print('Running query3...')
    
    df_master = pd.DataFrame(columns = m_cols)

    for i in range(x3, y3):
        try:
            if int(i) % int(c) == 0:
                print('Loop3 is at:', i)

            user_id = i
            line = ett.ett_parser.get_user(user_id)
            temp_df = pd.DataFrame(line, index=[i])
            df_master = df_master.append(temp_df, ignore_index = True)
        except Exception:
            print("Error_3:",i )

    #Export to excel
    file_name = 'export_file3_' + datetime.now().strftime("%H_%M_%S") + '.xlsx'
    df_master.to_excel(file_name, index = False)
    print('DataFrame(' + file_name + ') is written to Excel File successfully.')

def run4():
    print('Running query4...')
    
    df_master = pd.DataFrame(columns = m_cols)

    for i in range(x4, y4):
        try:
            if int(i) % int(c) == 0:
                print('Loop4 is at:', i)

            user_id = i
            line = ett.ett_parser.get_user(user_id)
            temp_df = pd.DataFrame(line, index=[i])
            df_master = df_master.append(temp_df, ignore_index = True)
        except Exception:
            print("Error_4:",i )

    #Export to excel
    file_name = 'export_file4_' + datetime.now().strftime("%H_%M_%S") + '.xlsx'
    df_master.to_excel(file_name, index = False)
    print('DataFrame(' + file_name + ') is written to Excel File successfully.')

def run5():
    print('Running query5...')
    
    df_master = pd.DataFrame(columns = m_cols)

    for i in range(x5, y5):
        try:
            if int(i) % int(c) == 0:
                print('Loop5 is at:', i)
            
            user_id = i
            line = ett.ett_parser.get_user(user_id)
            temp_df = pd.DataFrame(line, index=[i])
            df_master = df_master.append(temp_df, ignore_index = True)
        except Exception:
            print("Error_5:",i )

    #Export to excel
    file_name = 'export_file5_' + datetime.now().strftime("%H_%M_%S") +  '.xlsx'
    df_master.to_excel(file_name, index = False)
    print('DataFrame(' + file_name + ') is written to Excel File successfully.')

def run6():
    print('Running query6...')
    
    df_master = pd.DataFrame(columns = m_cols)

    for i in range(x6, y6):
        try:
            if int(i) % int(c) == 0:
                print('Loop6 is at:', i)
            
            user_id = i
            line = ett.ett_parser.get_user(user_id)
            temp_df = pd.DataFrame(line, index=[i])
            df_master = df_master.append(temp_df, ignore_index = True)
        except Exception:
            print("Error_6:",i )

    #Export to excel
    file_name = 'export_file6_' + datetime.now().strftime("%H_%M_%S") + '.xlsx'
    df_master.to_excel(file_name, index = False)
    print('DataFrame(' + file_name + ') is written to Excel File successfully.')

def run7():
    print('Running query7...')
    
    df_master = pd.DataFrame(columns = m_cols)

    for i in range(x7, y7):
        try:
            if int(i) % int(c) == 0:
                print('Loop7 is at:', i)

            user_id = i
            line = ett.ett_parser.get_user(user_id)
            temp_df = pd.DataFrame(line, index=[i])
            df_master = df_master.append(temp_df, ignore_index = True)
        except Exception:
            print("Error_7:",i )

    #Export to excel
    file_name = 'export_file7_' + datetime.now().strftime("%H_%M_%S") + '.xlsx'
    df_master.to_excel(file_name, index = False)
    print('DataFrame(' + file_name + ') is written to Excel File successfully.')

def run8():
    print('Running query8...')
    
    df_master = pd.DataFrame(columns = m_cols)

    for i in range(x8, y8):
        try:
            if int(i) % int(c) == 0:
                print('Loop8 is at:', i)
            
            user_id = i
            line = ett.ett_parser.get_user(user_id)
            temp_df = pd.DataFrame(line, index=[i])
            df_master = df_master.append(temp_df, ignore_index = True)
        except Exception:
            print("Error_8:",i )

    #Export to excel
    file_name = 'export_file8_' + datetime.now().strftime("%H_%M_%S") + '.xlsx'
    df_master.to_excel(file_name, index = False)
    print('DataFrame(' + file_name + ') is written to Excel File successfully.')

def run9():
    print('Running query9...')
    
    df_master = pd.DataFrame(columns = m_cols)

    for i in range(x9, y9):
        try:
            if int(i) % int(c) == 0:
                print('Loop9 is at:', i)
            
            user_id = i
            line = ett.ett_parser.get_user(user_id)
            temp_df = pd.DataFrame(line, index=[i])
            df_master = df_master.append(temp_df, ignore_index = True)
        except Exception:
            print("Error_9:",i )

    #Export to excel
    file_name = 'export_file9_' + datetime.now().strftime("%H_%M_%S") + '.xlsx'
    df_master.to_excel(file_name, index = False)
    print('DataFrame(' + file_name + ') is written to Excel File successfully.')


def run10():
    print('Running query10...')
    
    df_master = pd.DataFrame(columns = m_cols)

    for i in range(x10, y10):
        try:
            if int(i) % int(c) == 0:
                print('Loop10 is at:', i)
            user_id = i
            line = ett.ett_parser.get_user(user_id)
            temp_df = pd.DataFrame(line, index=[i])
            df_master = df_master.append(temp_df, ignore_index = True)
        except Exception:
            print("Error_10:",i )

    #Export to excel
    file_name = 'export_file10_' + datetime.now().strftime("%H_%M_%S") + '.xlsx'
    df_master.to_excel(file_name, index = False)
    print('DataFrame(' + file_name + ') is written to Excel File successfully.')
    
def main():

    
    p = Process(target=run1)
    p.start()
    #p.join()

    p2 = Process(target=run2)
    p2.start()

    p3 = Process(target=run3)
    p3.start()
    
    p4 = Process(target=run4)
    p4.start()

    p5 = Process(target=run5)
    p5.start()
    
    p6 = Process(target=run6)
    p6.start()

    p7 = Process(target=run7)
    p7.start()

    p8 = Process(target=run8)
    p8.start()

    p9 = Process(target=run9)
    p9.start()

    p10 = Process(target=run10)
    p10.start()
    p10.join()
    
if __name__ == '__main__':
    start = time.time()
    print('starting main')
    main()
    print('finishing main',time.time()-start)

更新代码

使用 swaggg 的答案,这段代码可以满足我的要求,而且要短得多。

from concurrent.futures import ThreadPoolExecutor
from multiprocessing import cpu_count
from pyETT import ett_parser
import pandas as pd
import time

def main():
    USER_ID_COUNT = 50
    MAX_WORKERS = 2 * cpu_count() + 1
    dataframe_list = []

    #user_array = [] 
    user_ids = list(range(1, USER_ID_COUNT))
 
    def obtain_user_record(user_id):
        return ett_parser.get_user(user_id)

    with ThreadPoolExecutor(MAX_WORKERS) as executor:
       for user_id, user_record in zip(user_ids, executor.map(obtain_user_record, user_ids)):
          if user_record:
             dataframe_list.append(user_record)

    df_master = pd.DataFrame.from_dict(dataframe_list,orient='columns')
    print(df_master)
    
if __name__ == '__main__':
    start = time.time()
    print('starting main')
    main()
    print('finishing main', time.time() - start)

【问题讨论】:

【参考方案1】:

只需使用 multiprocessing.Pool,它默认使用您的所有内核,不需要任何复杂的管理即可使其以 100% 的效率运行。产生太多的进程不是要走的路。

它还可以让你的代码更加整洁,因为你只需要提供一个函数和一个数据映射。

还有:

您可以使用time.strftime("%H_%M_%S") 代替datetime.now().strftime("%H_%M_%S")。日期时间模块真的很慢。

如果您使用原语而不是 pandas,尤其是仅使用基本的 Python open 结合 string.split 或不太理想的 Python xls.reader/writer,您将看到性能的巨大提升。熊猫真的很慢。在很多情况下,它的有用特性弥补了其缓慢的性能,但是如果您唯一需要做的就是从 csv/etc 文件中提取一些数据并且您正在处理一个庞大的数据集,那么您最好不要使用它。

编辑: 我查看了您使用的实际模块,它涉及发送请求,而我认为您正在从磁盘读取数据。

顺便说一句,如果我错了,请纠正我,但我认为 ett.ett_parser.get_user(user_id) 中的 ett. 是错误的,实际调用应该只是到 ett_parser

因为您的代码涉及通过网络发送多个请求,然后将数据写入磁盘,您可能应该使用多线程而不是多处理,或者您仍然可以使用多处理,但尝试不同数量的进程,因为在这种情况下该任务受 I/O 限制,您应该尝试发送比 CPU 中的逻辑内核更多的请求。同时,Pandas 就像我说的那样很慢,而您的另一个选择是使用 xlsxwriter 模块,它可以保证更快(因为您实际上只使用 pandas 将从您的 json 请求中获得的字符串写入 .xls文件。)如果您对 csv 格式没问题,您也可以尝试使用 .csv 编写器模块。但最重要的是,主要瓶颈将是请求,而不是将数据写入磁盘所需的处理能力。

其次,Booboo 的解决方案会调整您的代码以与池一起使用,这非常棒。但是不将数据分成块然后在进程之间划分这些块可能更方便。相反,由于请求是瓶颈,您可以使用多线程并将结果附加到全局数组。如果您只使用多线程映射,则只需将用户 ID 和函数传递给它。因此,您可以执行以下操作:

    from concurrent.futures import ThreadPoolExecutor
    from multiprocessing import cpu_count
    from pyETT import ett_parser

    USER_ID_COUNT = 50000
    MAX_WORKERS = 2 * cpu_count() + 1

    user_array = [] #Can be adapted to Pandas or the xlwriter module        
    user_ids = list(range(1, USER_ID_COUNT))

    def obtain_user_record(user_id):
        return ett_parser.get_user(user_id)

    with ThreadPoolExecutor(MAX_WORKERS) as executor:
       for user_id, user_record in zip(user_ids, executor.map(obtain_user_record, user_ids)):
          user_array.append(user_record)

然后,您可以将数组写入磁盘,随意拆分,如果您觉得值得,您当然可以使用多处理。

EDIT2:

好的,在这种情况下使用 Pandas。但是,请注意 pandas.DataFrame.append 状态的文档页面:

迭代地将行附加到 DataFrame 比单个连接的计算量更大。更好的解决方案是将这些行附加到列表中,然后将列表与原始 DataFrame 一次性连接起来。

哦,顺便说一句,我认为 pyETT 函数返回的是数据帧而不是字符串,对吗?我不太确定返回值的索引,但你可以弄清楚。这样做:

dataframe_list = []
with ThreadPoolExecutor(MAX_WORKERS) as executor:
       for user_id, user_record in zip(user_ids, executor.map(obtain_user_record, user_ids)):
          if user_record:
             dataframe_list.append(user_record)
          #If the dataframes aren't indexed, you could create a pre-filled
          #data_frame_list and overwrite specific indices instead of appending, 
          #then pass ignore_index=True to pd.concat

 df_master = pd.concat(dataframe_list) #We're done, time to merge our dframes
                                       #into one big dframe.

编辑3: 好的,那么我们需要将数据转换为 pandas 数据帧。

user_df = pd.DataFrame.from_dict(user_id:user_record, orient='index', columns=m_cols)
dataframe_list.append(user_df)

这样,帧将由我们之前传递给函数的用户 ID 索引。当然,您可以有多个列表并将它们合并到多个数据帧中,然后只需将数据帧写入单独的 .xls 文件,或者您可以采用其他方法。

编辑4: 有趣的是,它会让您在字典列表上使用 pd.DataFrame.from_dict。要调用的正确函数应该是 pd.DataFrame.from_records 。您实际上并不需要将 orient='index' 传递给它。将字典转换为数据框,然后合并它们有​​什么好处?好吧,你可以像我在 Edit3 中展示的那样控制索引。您也可以使用

pd.DataFrame.from_records(dataframe_list,index=index_list)
df_master.sort_index()

Index_list 是一个索引列表(即 user_ids),与 dataframe_list 的顺序相同。

你也可以用字典代替列表:

dataframe_dict = 
with ThreadPoolExecutor(MAX_WORKERS) as executor:
       for user_id, user_record in zip(user_ids, executor.map(obtain_user_record, user_ids)):
          if user_record:
             dataframe_dict[user_id] = user_record

然后调用的正确函数是 from_dict ,您可以再次索引记录,如下所示:

df_master = pd.DataFrame.from_dict(dataframe_dict, orient='index')   
df_master.sort_index()

不必每次都将 dicts 转换为数据帧在性能方面更好,所以很好的调用。

【讨论】:

这对我有用,但我正在努力将数组转换为数据框(对 xlswriter 不太熟悉)。 这个特殊的函数返回一个字典对象。 print(type(user_record),user_record) 返回: 'user-name': 'oscar-CLIENTELEVEN', 'elo': 1500.0, 'rank': 87588, 'wins': 0, 'losses': 0, 'last -online': '2020-03-29T01:44:14.193Z' 我在df_master = pd.concat(dataframe_list) Message=cannot concatenate object of type '' 上遇到错误;只有 Series 和 DataFrame obj 是有效的 我可以通过将其放在函数 df_master = pd.DataFrame.from_dict(dataframe_list,orient='columns') 的末尾来使其工作。使用这个和你发布的那个有什么好处吗? 你的意思是 pd.DataFrame.from_dict(user_record,orient='columns') ???? 这行不通,根据您发布的字典。您不能从列表中创建字典。【参考方案2】:

更新

首先,当您处理range(1, 50)range(51, 100)range(101, 150) 等范围时,user_id 将采用的实际值为:

1, 2, ... 49, 51, 52, ... 99, 101, 102, ... 149

也就是说,您不会处理值 50、100、150 等,因为给定 range(start, stop, step) 然后对于正步长,范围 r 的内容由公式 r[i] = start + step 确定*i where i >= 0 and r[i]

我怀疑您是否有意忽略这些值。我会假设这不是你的意图。

处理似乎可以分为 I/O 绑定部分(ett.ett_parser.get_user(user_id),它正在检索一个 URL)和一个 更多 CPU 密集部分(创建 pandas dataframe 并导出到 Excel)。前者应使用大型多线程池,后者应使用多处理池。

以下代码创建两个池并将多处理池传递给工作函数run,该函数检索其传递范围内指定的所有 URL,然后将检索到的行列表传递给多处理池进行组装和转换Excel。

from pyETT import ett_parser
import pandas as pd
import time
from datetime import datetime
from multiprocessing import Pool
from multiprocessing.pool import ThreadPool
from functools import partial
import sys

def run(multiprocessing_pool, run_number, the_range):
    c = 10

    print(f'Running query run_number...', flush=True)
    lines = []
    for user_id in the_range:
        try:
            if user_id % c == 0:
                print('Loop is at:', user_id, flush=True)
            line = ett.ett_parser.get_user(user_id)
            lines.append((user_id, line))
        except Exception:
            print("Error:", user_id, flush=True)
    if lines:
        multiprocessing_pool.apply(process_lines, args=(run_number, lines))

def process_lines(run_number, lines):
    m_cols = ('user-name','elo','rank','wins','losses','last-online')

    df_master = pd.DataFrame(columns = m_cols)
    for user_id, line in lines:
        temp_df = pd.DataFrame(line, index=[user_id])
        df_master = df_master.append(temp_df, ignore_index=True)
    #Export to excel
    file_name = f'export_filerun_number_datetime.now().strftime("%H_%M_%S").xlsx'
    df_master.to_excel(file_name, index=False)
    print(f'DataFrame(file_name) is written to Excel File successfully.', flush=True)

def split(lst, n):  # function to split list in n even parts
    k, m = divmod(len(lst), n)
    return list(lst[i * k + min(i, m):(i + 1) * k + min(i + 1, m)] for i in range(n))

def main():
    MIN_USER_ID = 1
    MAX_USER_ID = 500
    NUM_SPLITS = 10 # 50 user ids per split
    THREAD_POOL_SIZE = min(NUM_SPLITS, 100) # Or maybe even 500!
    # split this range into thread_pool_size smaller ranges:
    # Next line will process 500 user ids: 1, 2, ... 500
    ranges = split(range(MIN_USER_ID, MAX_USER_ID+1), NUM_SPLITS)
    multiprocessing_pool = Pool()
    multithreading_pool = ThreadPool(THREAD_POOL_SIZE)
    multithreading_pool.starmap(partial(run, multiprocessing_pool) , enumerate(ranges, start=1))

if __name__ == '__main__':
    start = time.time()
    print('starting main')
    main()
    print('finishing main', time.time() - start)

【讨论】:

将 cpu_count() 的输出传递给 Pool 是多余的,因为这是其参数采用的默认值。 @swaggg 感谢您的评论。这里强调指出进程数必须等于ranges 的长度。在这种情况下,num_processors 被设置为 cpu_count(),这是我们拥有的 logical 核心的数量。但我们可能会确定将num_processors 设置为物理 内核的数量实际上会使程序运行得同样快甚至更快。在这种情况下,不将num_processors 显式传递给Pool 构造函数将是一个巨大的错误。换句话说,这是小心编码。变量的另一个名称可能是n_processes 你的方法很有趣。我查看了 pyETT 模块的功能,得出的结论是 OP 需要的进程比他拥有的逻辑核心要多得多。 @swaggg 你是对的。查看更新的答案。 很抱歉。我不得不修改一行代码并更新了答案。

以上是关于正确使用多处理的主要内容,如果未能解决你的问题,请参考以下文章

使用多线程时如何正确处理此错误

Python 多处理正确完成工作,但进程仍然存在(Linux)

如何正确使用 Core Data 进行多线程处理? [关闭]

Spacy,在 python 中的大型数据集上使用 nlp.pipe,多处理导致进程进入睡眠状态。如何正确使用所有 CPU 内核?

jQuery 多事件处理程序 - 如何取消?

使用多处理队列、池和锁定的简单示例