进程池中的 itertuples 不起作用
Posted
技术标签:
【中文标题】进程池中的 itertuples 不起作用【英文标题】:itertuples in process pool don't works 【发布时间】:2021-10-25 14:26:28 【问题描述】:我不能在进程池中使用pandas.DataFrame.itertuples
。
看看这段代码:
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from functools import partial
from typing import Any, Callable, Iterable, Iterator
class Data:
def __init__(self, *args, **kwargs) -> None:
print(f'args: args\nkwargs: kwargs\n', end='')
@classmethod
def from_iterrows(cls, *args, **kwargs) -> 'Data':
*args, (index, series) = args
return cls(*args, **kwargs, params=series.to_dict())
@classmethod
def from_itertuples(cls, *args, **kwargs) -> 'Data':
*args, named_tuple = args
return cls(*args, **kwargs, params=named_tuple._asdict())
def process_pool(function: Callable, iterable: Iterable, workers: int, /, *args, **kwargs) -> Iterator[Any]:
with ProcessPoolExecutor(workers) as executor:
return executor.map(partial(function, *args, **kwargs), iterable)
def thread_pool(function: Callable, iterable: Iterable, workers: int, /, *args, **kwargs) -> Iterator[Any]:
with ThreadPoolExecutor(workers) as executor:
return executor.map(partial(function, *args, **kwargs), iterable)
df = pd.DataFrame(data='id': [1, 2],
'param1': [11, 111],
'param2': [22, 222],
'param3': [33, 333])
print(df)
# id param1 param2 param3
# 0 1 11 22 33
# 1 2 111 222 333
from_iterrows
使用thread_pool
:
thread_pool(Data.from_iterrows, df.iterrows(), 2, 100, 200, 300, a=10, b=20, c=30)
# args: (100, 200, 300)
# kwargs: 'a': 10, 'b': 20, 'c': 30, 'params': 'id': 1, 'param1': 11, 'param2': 22, 'param3': 33
# args: (100, 200, 300)
# kwargs: 'a': 10, 'b': 20, 'c': 30, 'params': 'id': 2, 'param1': 111, 'param2': 222, 'param3': 333
from_iterrows
使用process_pool
:
process_pool(Data.from_iterrows, df.iterrows(), 2, 100, 200, 300, a=10, b=20, c=30)
# args: (100, 200, 300)
# kwargs: 'a': 10, 'b': 20, 'c': 30, 'params': 'id': 1, 'param1': 11, 'param2': 22, 'param3': 33
# args: (100, 200, 300)
# kwargs: 'a': 10, 'b': 20, 'c': 30, 'params': 'id': 2, 'param1': 111, 'param2': 222, 'param3': 333
from_itertuples
使用thread_pool
:
thread_pool(Data.from_itertuples, df.itertuples(), 2, 100, 200, 300, a=10, b=20, c=30)
# args: (100, 200, 300)
# kwargs: 'a': 10, 'b': 20, 'c': 30, 'params': 'Index': 0, 'id': 1, 'param1': 11, 'param2': 22, 'param3': 33
# args: (100, 200, 300)
# kwargs: 'a': 10, 'b': 20, 'c': 30, 'params': 'Index': 1, 'id': 2, 'param1': 111, 'param2': 222, 'param3': 333
from_itertuples
使用process_pool
:
process_pool(Data.from_itertuples, df.itertuples(), 2, 100, 200, 300, a=10, b=20, c=30)
# print nothing...
谁能解释一下为什么会这样?
【问题讨论】:
【参考方案1】:这个问题可以用下面的程序来演示:
import pandas as pd
from multiprocessing import Process
def worker(p):
pass
# Required for Windows:
if __name__ == '__main__':
df = pd.DataFrame(data='id': [1, 2],
'param1': [11, 111],
'param2': [22, 222],
'param3': [33, 333])
for name in [None, 'Pandas']:
print('Using name:', 'None' if name is None else name)
it = df.itertuples(name=name)
print(type(it))
# convert to a list:
l = list(it)
print(type(l[0]), l[0])
p = Process(target=worker, args=(l[0],))
p.start()
p.join()
print('Process successfully ended.')
print()
打印:
Using name: None
<class 'zip'>
<class 'tuple'> (0, 1, 11, 22, 33)
Process successfully ended.
Using name: Pandas
<class 'map'>
<class 'pandas.core.frame.Pandas'> Pandas(Index=0, id=1, param1=11, param2=22, param3=33)
Traceback (most recent call last):
File "C:\Ron\test\test.py", line 22, in <module>
p.start()
File "C:\Program Files\Python38\lib\multiprocessing\process.py", line 121, in start
self._popen = self._Popen(self)
File "C:\Program Files\Python38\lib\multiprocessing\context.py", line 224, in _Popen
return _default_context.get_context().Process._Popen(process_obj)
File "C:\Program Files\Python38\lib\multiprocessing\context.py", line 327, in _Popen
return Popen(process_obj)
File "C:\Program Files\Python38\lib\multiprocessing\popen_spawn_win32.py", line 93, in __init__
reduction.dump(process_obj, to_child)
File "C:\Program Files\Python38\lib\multiprocessing\reduction.py", line 60, in dump
ForkingPickler(file, protocol).dump(obj)
_pickle.PicklingError: Can't pickle <class 'pandas.core.frame.Pandas'>: attribute lookup Pandas on pandas.core.frame failed
C:\Ron\test>Traceback (most recent call last):
File "<string>", line 1, in <module>
File "C:\Program Files\Python38\lib\multiprocessing\spawn.py", line 102, in spawn_main
source_process = _winapi.OpenProcess(
OSError: [WinError 87] The parameter is incorrect
因此,根据是否为 df.itertuples
指定 name=None(默认为 name='Pandas'),生成的类会有所不同。上述异常中的关键行是:
_pickle.PicklingError:无法腌制
现在至于为什么在使用concurrent.futures.ProcessPoolExecutor
时没有打印异常,我不能说。但是如果你改用multiprocessing.pool.Pool
,你肯定会得到pickle错误信息。
您可以通过为df.itertuples
指定name=None 参数来或多或少地使其工作。但是您将无法对生成的named_tuple
变量使用_asdict
方法,因为它现在只是tuple
类型:
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from functools import partial
from typing import Any, Callable, Iterable, Iterator
import pandas as pd
class Data:
def __init__(self, *args, **kwargs) -> None:
print(f'args: args\nkwargs: kwargs\n', end='')
@classmethod
def from_iterrows(cls, *args, **kwargs) -> 'Data':
*args, (index, series) = args
return cls(*args, **kwargs, params=series.to_dict())
@classmethod
def from_itertuples(cls, *args, **kwargs) -> 'Data':
*args, named_tuple = args
return cls(*args, **kwargs)
def process_pool(function: Callable, iterable: Iterable, workers: int, *args, **kwargs) -> Iterator[Any]:
with ProcessPoolExecutor(workers) as executor:
return executor.map(partial(function, *args, **kwargs), iterable)
def thread_pool(function: Callable, iterable: Iterable, workers: int, *args, **kwargs) -> Iterator[Any]:
with ThreadPoolExecutor(workers) as executor:
return executor.map(partial(function, *args, **kwargs), iterable)
# Required for Windows:
if __name__ == '__main__':
df = pd.DataFrame(data='id': [1, 2],
'param1': [11, 111],
'param2': [22, 222],
'param3': [33, 333])
print(df)
print(list(thread_pool(Data.from_itertuples, df.itertuples(), 2, 100, 200, 300, a=10, b=20, c=30)))
print(list(process_pool(Data.from_itertuples, df.itertuples(name=None), 2, 100, 200, 300, a=10, b=20, c=30)))
打印:
id param1 param2 param3
0 1 11 22 33
1 2 111 222 333
args: (100, 200, 300)
kwargs: 'a': 10, 'b': 20, 'c': 30
args: (100, 200, 300)
kwargs: 'a': 10, 'b': 20, 'c': 30
[<__main__.Data object at 0x0000026CFC34A6D0>, <__main__.Data object at 0x0000026CFC34A0A0>]
args: (100, 200, 300)
kwargs: 'a': 10, 'b': 20, 'c': 30
args: (100, 200, 300)
kwargs: 'a': 10, 'b': 20, 'c': 30
[<__main__.Data object at 0x0000026CFC3746D0>, <__main__.Data object at 0x0000026CFC374550>]
这个post 可能很有趣,但不是很有帮助。
【讨论】:
以上是关于进程池中的 itertuples 不起作用的主要内容,如果未能解决你的问题,请参考以下文章