python中的多处理-在多个进程之间共享大对象(例如pandas数据框)

Posted

技术标签:

【中文标题】python中的多处理-在多个进程之间共享大对象(例如pandas数据框)【英文标题】:multiprocessing in python - sharing large object (e.g. pandas dataframe) between multiple processes 【发布时间】:2014-04-24 14:26:57 【问题描述】:

我正在使用 Python 多处理,更准确地说

from multiprocessing import Pool
p = Pool(15)

args = [(df, config1), (df, config2), ...] #list of args - df is the same object in each tuple
res = p.map_async(func, args) #func is some arbitrary function
p.close()
p.join()

这种方式内存消耗很大;几乎耗尽了我所有的 RAM(此时它变得非常慢,因此使多处理变得毫无用处)。我认为问题在于df 是一个巨大的对象(一个大熊猫数据框),它会为每个进程复制。我尝试使用multiprocessing.Value 共享数据框而不复制

shared_df = multiprocessing.Value(pandas.DataFrame, df)
args = [(shared_df, config1), (shared_df, config2), ...] 

(如Python multiprocessing shared memory 中的建议),但这给了我TypeError: this type has no size(与Sharing a complex object between Python processes? 相同,很遗憾我不明白答案)。

我是第一次使用多处理,也许我的理解还不够好。 multiprocessing.Value 在这种情况下实际上是正确的吗?我看到了其他建议(例如队列),但现在有点困惑。有哪些选项可以共享内存,在这种情况下哪一个最好?

【问题讨论】:

查看最近的相关问题:***.com/questions/22468279/…。 有最近的方法可以做到这一点,还是使用Namespace 仍然是最好的方法?你是怎么解决的@Anne 【参考方案1】:

Value 的第一个参数是 typecode_or_type。即定义为:

typecode_or_type 确定返回对象的类型:是 ctypes 类型或单字符类型代码 数组模块。 *args 被传递给类型的构造函数。

强调我的。因此,您根本不能将 pandas 数据框放在 Value 中,它必须是 a ctypes type。

您可以改为使用multiprocessing.Manager 将您的单例数据框实例提供给您的所有进程。有几种不同的方法可以在同一个地方结束 - 可能最简单的方法是将您的数据框放入经理的Namespace

from multiprocessing import Manager

mgr = Manager()
ns = mgr.Namespace()
ns.df = my_dataframe

# now just give your processes access to ns, i.e. most simply
# p = Process(target=worker, args=(ns, work_unit))

现在您的数据框实例可以被传递给管理器引用的任何进程访问。或者只是传递对Namespace 的引用,这样更简洁。

我没有/不会介绍的一件事是事件和信号 - 如果您的流程需要等待其他人完成执行,您需要添加它。Here is a page 和一些 Event 示例还详细介绍了如何使用经理的Namespace

(请注意,这些都没有说明multiprocessing 是否会带来切实的性能优势,这只是为您提供探索该问题的工具)

【讨论】:

谢谢,这大大减少了内存消耗。它仍然比我想象的要高得多 - 我如何找出内存消耗的来源? @Anne,这种Namespace 方法也会对我造成大量内存消耗。我已经使用具有数百万行和 6 列(占用 2 GB RAM)的 DF 进行了尝试,工作人员最终也使用了这么多。更重要的是,在非多处理环境中对工作人员进行分析时,在非多处理环境中访问快速(ns.df.loc[ix] 调用也可能需要几秒钟。 @roippi 和 @Jeff,你们对此有什么想法吗? 尝试这种方法我可以从共享内存中读取 df,但我无法更改它的值。 我用大 df(从 ~9Gb csv 加载)尝试了这种方法,我得到了一个格式错误,我猜这是因为 df 的大小,“struct.error: 'i' format requires -2147483648 <= number <= 2147483647” , 有什么建议?有其他可用的技术吗? 我发现对于大数据结构、大字典或 pandas 数据帧,最好实现这种方法***.com/questions/48464565/…【参考方案2】:

您可以通过创建 data_handler 子进程在进程之间共享 pandas 数据帧,而无需任何内存开销。此过程从您的非常大的数据框对象接收来自具有特定数据请求(即行、特定单元格、切片等)的其他子级的调用。只有 data_handler 进程将您的数据帧保存在内存中,这与像 Namespace 这样的管理器不同,它会导致数据帧被复制到所有子进程。请参阅下面的工作示例。这可以转换为池。

需要一个进度条吗?在这里查看我的答案:https://***.com/a/55305714/11186769

import time
import Queue
import numpy as np
import pandas as pd
import multiprocessing
from random import randint

#==========================================================
# DATA HANDLER
#==========================================================

def data_handler( queue_c, queue_r, queue_d, n_processes ):

    # Create a big dataframe
    big_df = pd.DataFrame(np.random.randint(
        0,100,size=(100, 4)), columns=list('ABCD'))

    # Handle data requests
    finished = 0
    while finished < n_processes:

        try:
            # Get the index we sent in
            idx = queue_c.get(False)

        except Queue.Empty:
            continue
        else:
            if idx == 'finished':
                finished += 1
            else:
                try:
                    # Use the big_df here!
                    B_data = big_df.loc[ idx, 'B' ]

                    # Send back some data
                    queue_r.put(B_data)
                except:
                    pass    

# big_df may need to be deleted at the end. 
#import gc; del big_df; gc.collect()

#==========================================================
# PROCESS DATA
#==========================================================

def process_data( queue_c, queue_r, queue_d):

    data = []

    # Save computer memory with a generator
    generator = ( randint(0,x) for x in range(100) )

    for g in generator:

        """
        Lets make a request by sending
        in the index of the data we want. 
        Keep in mind you may receive another 
        child processes return call, which is
        fine if order isnt important.
        """

        #print(g)

        # Send an index value
        queue_c.put(g)

        # Handle the return call
        while True:
            try:
                return_call = queue_r.get(False)
            except Queue.Empty:
                continue
            else:
                data.append(return_call)
                break

    queue_c.put('finished')
    queue_d.put(data)   

#==========================================================
# START MULTIPROCESSING
#==========================================================

def multiprocess( n_processes ):

    combined  = []
    processes = []

    # Create queues
    queue_data = multiprocessing.Queue()
    queue_call = multiprocessing.Queue()
    queue_receive = multiprocessing.Queue()

    for process in range(n_processes): 

        if process == 0:

                # Load your data_handler once here
                p = multiprocessing.Process(target = data_handler,
                args=(queue_call, queue_receive, queue_data, n_processes))
                processes.append(p)
                p.start()

        p = multiprocessing.Process(target = process_data,
        args=(queue_call, queue_receive, queue_data))
        processes.append(p)
        p.start()

    for i in range(n_processes):
        data_list = queue_data.get()    
        combined += data_list

    for p in processes:
        p.join()    

    # Your B values
    print(combined)


if __name__ == "__main__":

    multiprocess( n_processes = 4 )

【讨论】:

【参考方案3】:

您可以使用Array 而不是Value 来存储您的数据框。

以下解决方案将pandas 数据帧转换为将其数据存储在共享内存中的对象:

import numpy as np
import pandas as pd
import multiprocessing as mp
import ctypes

# the origingal dataframe is df, store the columns/dtypes pairs
df_dtypes_dict = dict(list(zip(df.columns, df.dtypes)))

# declare a shared Array with data from df
mparr = mp.Array(ctypes.c_double, df.values.reshape(-1))

# create a new df based on the shared array
df_shared = pd.DataFrame(np.frombuffer(mparr.get_obj()).reshape(df.shape),
                         columns=df.columns).astype(df_dtypes_dict)

如果现在您在进程间共享df_shared,则不会制作额外的副本。对于你的情况:

pool = mp.Pool(15)

def fun(config):
    # df_shared is global to the script
    df_shared.apply(config)  # whatever compute you do with df/config

config_list = [config1, config2]
res = p.map_async(fun, config_list)
p.close()
p.join()

如果您使用pandarallel,这也特别有用,例如:

# this will not explode in memory
from pandarallel import pandarallel
pandarallel.initialize()
df_shared.parallel_apply(your_fun, axis=1)

注意:使用此解决方案,您最终会得到两个数据帧(df 和 df_shared),它们消耗两倍的内存并且初始化时间很长。可以直接在共享内存中读取数据。

【讨论】:

这似乎是共享 pandas 数据帧而不复制到每个子进程的唯一可行方法,同时能够使用多核计算。 MP 中的命名空间和管理器等其他内容仍会创建副本。只有使用 Array,主进程内存使用量从原始 df 的 7 倍(共享时)变为 3 倍(共享后,运行时)(至少在 Win 上),并且重新创建 df 需要大量时间。有什么办法可以进一步优化/加快速度?【参考方案4】:

至少 Python 3.6 支持将 pandas DataFrame 存储为 multiprocessing.Value。请参阅下面的工作示例:

import ctypes
import pandas as pd
from multiprocessing import Value

df = pd.DataFrame('a': range(0,9),
                   'b': range(10,19),
                   'c': range(100,109))

k = Value(ctypes.py_object)
k.value = df

print(k.value)

【讨论】:

即使成功了,我的 RAM 消耗也确实增加了【参考方案5】:

我很惊讶 joblib's Parallel(至少从 1.0.1 开始)已经支持与开箱即用的多进程工作人员共享 pandas 数据帧。至少使用“loky”后端。 我通过实验发现了一件事:传递给函数的参数不应包含任何大字典。如果他们这样做,请将 dict 转换为 Series 或 Dataframe。 每个工作人员肯定会使用一些额外的内存,但远小于驻留在主进程中的所谓“大”数据帧的大小。并且计算立即在所有工人中开始。否则,joblib 会启动您请求的所有工作人员,但是当对象按顺序复制到每个工作人员时,它们处于空闲状态,这需要很长时间。如果有人需要,我可以提供代码示例。我只在只读模式下测试了数据帧处理。文档中没有提到该功能,但它适用于 Pandas。

【讨论】:

一个代码示例会很棒!我从来没有乐于解决我的问题。 代码示例在这里:github.com/joblib/joblib/issues/1244

以上是关于python中的多处理-在多个进程之间共享大对象(例如pandas数据框)的主要内容,如果未能解决你的问题,请参考以下文章

httpd服务学习笔记

在 Ruby 中处理多个进程

在 Python 进程之间共享一个大的(只读)二进制字符串? [复制]

Python的多线程threading和多进程multiprocessing

多处理 - 共享一个复杂的对象

python中的多处理模块和修改共享的全局变量