进程池中的 itertuples 不起作用

Posted

技术标签:

【中文标题】进程池中的 itertuples 不起作用【英文标题】:itertuples in process pool don't works 【发布时间】:2021-10-25 14:26:28 【问题描述】:

我不能在进程池中使用pandas.DataFrame.itertuples

看看这段代码:

from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from functools import partial
from typing import Any, Callable, Iterable, Iterator

class Data:
    def __init__(self, *args, **kwargs) -> None:
        print(f'args: args\nkwargs: kwargs\n', end='')

    @classmethod
    def from_iterrows(cls, *args, **kwargs) -> 'Data':
        *args, (index, series) = args
        return cls(*args, **kwargs, params=series.to_dict())

    @classmethod
    def from_itertuples(cls, *args, **kwargs) -> 'Data':
        *args, named_tuple = args
        return cls(*args, **kwargs, params=named_tuple._asdict())

def process_pool(function: Callable, iterable: Iterable, workers: int, /, *args, **kwargs) -> Iterator[Any]:
    with ProcessPoolExecutor(workers) as executor:
        return executor.map(partial(function, *args, **kwargs), iterable)

def thread_pool(function: Callable, iterable: Iterable, workers: int, /, *args, **kwargs) -> Iterator[Any]:
    with ThreadPoolExecutor(workers) as executor:
        return executor.map(partial(function, *args, **kwargs), iterable)

df = pd.DataFrame(data='id': [1, 2],
                    'param1': [11, 111],
                    'param2': [22, 222],
                    'param3': [33, 333])
print(df)

#    id  param1  param2  param3
# 0   1      11      22      33
# 1   2     111     222     333

from_iterrows 使用thread_pool

thread_pool(Data.from_iterrows, df.iterrows(), 2, 100, 200, 300, a=10, b=20, c=30)

# args: (100, 200, 300)
# kwargs: 'a': 10, 'b': 20, 'c': 30, 'params': 'id': 1, 'param1': 11, 'param2': 22, 'param3': 33
# args: (100, 200, 300)
# kwargs: 'a': 10, 'b': 20, 'c': 30, 'params': 'id': 2, 'param1': 111, 'param2': 222, 'param3': 333

from_iterrows 使用process_pool

process_pool(Data.from_iterrows, df.iterrows(), 2, 100, 200, 300, a=10, b=20, c=30)

# args: (100, 200, 300)
# kwargs: 'a': 10, 'b': 20, 'c': 30, 'params': 'id': 1, 'param1': 11, 'param2': 22, 'param3': 33
# args: (100, 200, 300)
# kwargs: 'a': 10, 'b': 20, 'c': 30, 'params': 'id': 2, 'param1': 111, 'param2': 222, 'param3': 333

from_itertuples 使用thread_pool

thread_pool(Data.from_itertuples, df.itertuples(), 2, 100, 200, 300, a=10, b=20, c=30)

# args: (100, 200, 300)
# kwargs: 'a': 10, 'b': 20, 'c': 30, 'params': 'Index': 0, 'id': 1, 'param1': 11, 'param2': 22, 'param3': 33
# args: (100, 200, 300)
# kwargs: 'a': 10, 'b': 20, 'c': 30, 'params': 'Index': 1, 'id': 2, 'param1': 111, 'param2': 222, 'param3': 333

from_itertuples 使用process_pool

process_pool(Data.from_itertuples, df.itertuples(), 2, 100, 200, 300, a=10, b=20, c=30)

# print nothing...

谁能解释一下为什么会这样?

【问题讨论】:

【参考方案1】:

这个问题可以用下面的程序来演示:

import pandas as pd
from multiprocessing import Process

def worker(p):
    pass

# Required for Windows:
if __name__ == '__main__':
    df = pd.DataFrame(data='id': [1, 2],
                        'param1': [11, 111],
                        'param2': [22, 222],
                        'param3': [33, 333])

    for name in [None, 'Pandas']:
        print('Using name:', 'None' if name is None else name)
        it = df.itertuples(name=name)
        print(type(it))
        # convert to a list:
        l = list(it)
        print(type(l[0]), l[0])
        p = Process(target=worker, args=(l[0],))
        p.start()
        p.join()
        print('Process successfully ended.')
        print()

打印:

Using name: None
<class 'zip'>
<class 'tuple'> (0, 1, 11, 22, 33)
Process successfully ended.

Using name: Pandas
<class 'map'>
<class 'pandas.core.frame.Pandas'> Pandas(Index=0, id=1, param1=11, param2=22, param3=33)
Traceback (most recent call last):
  File "C:\Ron\test\test.py", line 22, in <module>
    p.start()
  File "C:\Program Files\Python38\lib\multiprocessing\process.py", line 121, in start
    self._popen = self._Popen(self)
  File "C:\Program Files\Python38\lib\multiprocessing\context.py", line 224, in _Popen
    return _default_context.get_context().Process._Popen(process_obj)
  File "C:\Program Files\Python38\lib\multiprocessing\context.py", line 327, in _Popen
    return Popen(process_obj)
  File "C:\Program Files\Python38\lib\multiprocessing\popen_spawn_win32.py", line 93, in __init__
    reduction.dump(process_obj, to_child)
  File "C:\Program Files\Python38\lib\multiprocessing\reduction.py", line 60, in dump
    ForkingPickler(file, protocol).dump(obj)
_pickle.PicklingError: Can't pickle <class 'pandas.core.frame.Pandas'>: attribute lookup Pandas on pandas.core.frame failed

C:\Ron\test>Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "C:\Program Files\Python38\lib\multiprocessing\spawn.py", line 102, in spawn_main
    source_process = _winapi.OpenProcess(
OSError: [WinError 87] The parameter is incorrect

因此,根据是否为 ​​df.itertuples 指定 name=None(默认为 name='Pandas'),生成的类会有所不同。上述异常中的关键行是:

_pickle.PicklingError:无法腌制:pandas.core.frame 上的属性查找 Pandas 失败

现在至于为什么在使用concurrent.futures.ProcessPoolExecutor 时没有打印异常,我不能说。但是如果你改用multiprocessing.pool.Pool,你肯定会得到pickle错误信息。

您可以通过为df.itertuples 指定name=None 参数来或多或少地使其工作。但是您将无法对生成的named_tuple 变量使用_asdict 方法,因为它现在只是tuple 类型:

from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from functools import partial
from typing import Any, Callable, Iterable, Iterator
import pandas as pd

class Data:
    def __init__(self, *args, **kwargs) -> None:
        print(f'args: args\nkwargs: kwargs\n', end='')

    @classmethod
    def from_iterrows(cls, *args, **kwargs) -> 'Data':
        *args, (index, series) = args
        return cls(*args, **kwargs, params=series.to_dict())

    @classmethod
    def from_itertuples(cls, *args, **kwargs) -> 'Data':
        *args, named_tuple = args
        return cls(*args, **kwargs)

def process_pool(function: Callable, iterable: Iterable, workers: int, *args, **kwargs) -> Iterator[Any]:
    with ProcessPoolExecutor(workers) as executor:
        return executor.map(partial(function, *args, **kwargs), iterable)

def thread_pool(function: Callable, iterable: Iterable, workers: int, *args, **kwargs) -> Iterator[Any]:
    with ThreadPoolExecutor(workers) as executor:
        return executor.map(partial(function, *args, **kwargs), iterable)

# Required for Windows:
if __name__ == '__main__':

    df = pd.DataFrame(data='id': [1, 2],
                        'param1': [11, 111],
                        'param2': [22, 222],
                        'param3': [33, 333])
    print(df)

    print(list(thread_pool(Data.from_itertuples, df.itertuples(), 2, 100, 200, 300, a=10, b=20, c=30)))
    print(list(process_pool(Data.from_itertuples, df.itertuples(name=None), 2, 100, 200, 300, a=10, b=20, c=30)))

打印:

   id  param1  param2  param3
0   1      11      22      33
1   2     111     222     333
args: (100, 200, 300)
kwargs: 'a': 10, 'b': 20, 'c': 30
args: (100, 200, 300)
kwargs: 'a': 10, 'b': 20, 'c': 30
[<__main__.Data object at 0x0000026CFC34A6D0>, <__main__.Data object at 0x0000026CFC34A0A0>]
args: (100, 200, 300)
kwargs: 'a': 10, 'b': 20, 'c': 30
args: (100, 200, 300)
kwargs: 'a': 10, 'b': 20, 'c': 30
[<__main__.Data object at 0x0000026CFC3746D0>, <__main__.Data object at 0x0000026CFC374550>]

这个post 可能很有趣,但不是很有帮助。

【讨论】:

以上是关于进程池中的 itertuples 不起作用的主要内容,如果未能解决你的问题,请参考以下文章

脚本中的子进程不起作用,手动启动时会起作用

views.py 中的子进程不起作用

更改 Tomcat 数据源池中的目录

RedirectToAction 在特定操作(进程)中不起作用

PHP启动的Bash后台进程不起作用

子进程似乎在 pyinstaller exe 文件中不起作用