进行多处理时“TypeError:'type' object is not subscriptable”。我究竟做错了啥?

Posted

技术标签:

【中文标题】进行多处理时“TypeError:\'type\' object is not subscriptable”。我究竟做错了啥?【英文标题】:"TypeError: 'type' object is not subscriptable" when doing multiprocessing. What am I doing wrong?进行多处理时“TypeError:'type' object is not subscriptable”。我究竟做错了什么? 【发布时间】:2021-01-01 18:04:01 【问题描述】:

我尝试“多”处理函数func,但总是得到这个错误:

File "c:\...programs\python\python37\lib\multiprocessing\pool.py", line 268, in map
    return self._map_async(func, iterable, mapstar, chunksize).get()

  File "c:\...\programs\python\python37\lib\multiprocessing\pool.py", line 657, in get
    raise self._value

TypeError: 'type' object is not subscriptable

我做错了什么?每个job 都是一个字典,包含func 所需的所有参数

最少的可重复样本:

import multiprocessing as mp,pandas as pd
def func(name, raw_df=pd.DataFrame, df=, width=0):
    # 3. do some column operations. (actually theres more than just this operation)  
    seriesF =  raw_df[[name]].dropna()
    afterDropping_indices = seriesF.index.copy(deep=True) 
    list_ = list(raw_df[name])[width:]  
    df[name]=pd.Series(list_.copy(), index=afterDropping_indices[width:]) 
       
def preprocess_columns(raw_df ):
 
    # get all inputs.
    df, width = , 137 
    args = "raw_df":raw_df, "df":df, 'width': width   
    column_names = raw_df.columns

    # get input-dict for every single job.
    jobs=[]
    for i in range(len(column_names)):
        job = "name":column_names[i]
        job.update(args) 
        jobs.append(job) 

    # mutliprocessing
    pool = mp.Pool(len(column_names))  
    pool.map(func, jobs)    
    
    # create df from dict and reindex 
    df=pd.concat(df,axis=1) 
    df=df.reindex(df.index[::-1])
    return df 

if __name__=='__main__': 
    raw_df = pd.DataFrame("A":[ 1.1 ]*100000, "B":[ 2.2 ]*100000, "C":[ 3.3 ]*100000) 
    raw_df = preprocess_columns(raw_df ) 

编辑:仅传递 column 而不是 raw_df 的版本

import multiprocessing as mp,pandas as pd
def func(name, series, df, width):
    # 3. do some column operations. (actually theres more than just this operation)  
    seriesF =  series.dropna()
    afterDropping_indices = seriesF.index.copy(deep=True) 
    list_ = list(series)[width:]  
    df[name]=pd.Series(list_.copy(), index=afterDropping_indices[width:]) 
       
def preprocess_columns(raw_df ):
 
    df, width = , 137 
    args = "df":df, 'width': width  
     
    column_names = raw_df.columns
    jobs=[]
    for i in range(len(column_names)):
        job = "name":column_names[i], "series":raw_df[column_names[i]]
        job.update(args)  
        jobs.append(job)
    
    pool = mp.Pool(len(column_names))  
    pool.map(func, jobs)    
    
    # create df from dict and reindex 
    df=pd.concat(df,axis=1) 
    df=df.reindex(df.index[::-1])
    return df 

if __name__=='__main__': 
    raw_df = pd.DataFrame("A":[ 1.1 ]*100000, "B":[ 2.2 ]*100000, "C":[ 3.3 ]*100000) 
    raw_df = preprocess_columns(raw_df ) 

结果:

TypeError: func() missing 3 required positional arguments: 'series', 'df', and 'width'

【问题讨论】:

raw_df=pd.DataFrame 没有意义。您的工作人员需要实际的数据框,而不是 pd.DataFrame。 (实际上,他们真的只需要他们要处理的列,您应该更改代码以仅传递该列,以减少进程间通信开销。) @user2357112supportsMonica 请原谅,我忘了在发布问题之前我把这些关键字放在那里。所以不幸的是,关键字不是错误的原因。您关于仅传递列的建议听起来不错,但没有办法通过仅将名称作为将发生并行化的元素传递吗? 编辑后的代码会产生完全不同的错误。 @user2357112supportsMonica 你会这么好心,告诉我,我做错了什么吗? (再次编辑)。之前的评论:raw_dfargs 字典中 【参考方案1】:

我找到了解决方案: 总结:

    添加了 expand_call() 函数(见下文)。 迭代输出结果并将元素附加到普通列表。

注意:这只处理多个线程。


import multiprocessing as mp,pandas as pd
def func(name, raw_df, df, width):
    # 3. do some column operations. (actually theres more than just this operation)  
    seriesF =  raw_df[name].dropna()
    afterDropping_indices = seriesF.index.copy(deep=True) 
    list_ = list(raw_df[name])[width:]  
    df[name]=pd.Series(list_.copy(), index=afterDropping_indices[width:])  
    df[name].name = name
    return df

def expandCall(kargs): 
    # Expand the arguments of a callback function, kargs[’func’] 
    func=kargs['func'] 
    del kargs['func']  
    out=func(**kargs)  
    return out
 
def preprocess_columns(raw_df ): 
    df, width = pd.DataFrame(), 137
    args = "df":df, "raw_df":raw_df, 'width': width 
     
    column_names = raw_df.columns
    jobs=[]
    for i in range(len(column_names)):
        job = "func":func,"name":column_names[i]
        job.update(args)
        jobs.append(job)
    
    pool = mp.Pool(len(column_names))
    task=jobs[0]['func'].__name__
    outputs= pool.imap_unordered(expandCall, jobs)
    
    out = [];  
    for i,out_ in enumerate(outputs,1):
        out.append(out_)  
    pool.close(); pool.join() # this is needed to prevent memory leaks return out
      
    # create df from dict and reindex
    df=pd.concat(out,axis=1)  
    df=df.reindex(df.index[::-1]) 
    print(df)
    return df 

if __name__=='__main__': 
    raw_df = pd.DataFrame("A":[ 1.1 ]*100000, "B":[ 2.2 ]*100000, "C":[ 3.3 ]*100000) 
    raw_df = preprocess_columns(raw_df ) 

【讨论】:

以上是关于进行多处理时“TypeError:'type' object is not subscriptable”。我究竟做错了啥?的主要内容,如果未能解决你的问题,请参考以下文章

使用 Armadillo 和 OpenBLAS 进行多线程处理时性能不一致

在 Python 中进行多处理时,在类实例中通过 self 传递参数

使用 Tkinter 和 Selenium 进行多处理

如何在 FastAPI 中进行多处理

多线程

通过简单的AJAX递归请求进行多任务的数据自动处理