Pandas 和多处理内存管理:将 DataFrame 拆分为多个块

Posted

技术标签:

【中文标题】Pandas 和多处理内存管理:将 DataFrame 拆分为多个块【英文标题】:Pandas and Multiprocessing Memory Management: Splitting a DataFrame into Multiple Chunks 【发布时间】:2017-05-05 12:52:30 【问题描述】:

我必须在一行一行的基础上处理一个巨大的pandas.DataFrame(几十 GB),其中每行操作都相当长(几十毫秒)。所以我有了将帧分割成块并使用multiprocessing 并行处理每个块的想法。这确实加速了任务,但内存消耗是一场噩梦。

虽然每个子进程原则上应该只消耗一小部分数据,但它需要(几乎)与包含原始DataFrame 的原始父进程一样多的内存。即使在父进程中删除使用的部分也无济于事。

我写了一个最小的例子来复制这种行为。它唯一做的就是用随机数创建一个大的DataFrame,将它分成最多100行的小块,并在多处理期间简单地打印一些关于DataFrame的信息(这里通过大小为4的mp.Pool) .

并行执行的main函数:

def just_wait_and_print_len_and_idx(df):
    """Waits for 5 seconds and prints df length and first and last index"""
    # Extract some info
    idx_values = df.index.values
    first_idx, last_idx = idx_values[0], idx_values[-1]
    length = len(df)
    pid = os.getpid()

    # Waste some CPU cycles
    time.sleep(1)

    # Print the info
    print('First idx , last idx  and len  '
          'from process '.format(first_idx, last_idx, length, pid))

DataFrame 分块的辅助生成器:

def df_chunking(df, chunksize):
    """Splits df into chunks, drops data of original df inplace"""
    count = 0 # Counter for chunks
    while len(df):
        count += 1
        print('Preparing chunk '.format(count))
        # Return df chunk
        yield df.iloc[:chunksize].copy()
        # Delete data in place because it is no longer needed
        df.drop(df.index[:chunksize], inplace=True)

还有主程序:

def main():
    # Job parameters
    n_jobs = 4  # Poolsize
    size = (10000, 1000)  # Size of DataFrame
    chunksize = 100  # Maximum size of Frame Chunk

    # Preparation
    df = pd.DataFrame(np.random.rand(*size))
    pool = mp.Pool(n_jobs)

    print('Starting MP')

    # Execute the wait and print function in parallel
    pool.imap(just_wait_and_print_len_and_idx, df_chunking(df, chunksize))

    pool.close()
    pool.join()

    print('DONE')

标准输出如下所示:

Starting MP
Preparing chunk 1
Preparing chunk 2
First idx 0, last idx 99 and len 100 from process 9913
First idx 100, last idx 199 and len 100 from process 9914
Preparing chunk 3
First idx 200, last idx 299 and len 100 from process 9915
Preparing chunk 4
...
DONE

问题:

主进程需要大约 120MB 内存。但是,池的子进程需要相同数量的内存,尽管它们只包含原始 DataFame 的 1%(大小为 100 的块与原始长度为 10000)。为什么?

我能做些什么呢?尽管我进行了分块,Python (3) 是否会将整个 DataFrame 发送到每个子进程?这是pandas 内存管理的问题还是multiprocessing 和数据酸洗的问题?谢谢!



如果您想自己尝试一下,可以简单地复制和粘贴整个脚本:

import multiprocessing as mp
import pandas as pd
import numpy as np
import time
import os


def just_wait_and_print_len_and_idx(df):
    """Waits for 5 seconds and prints df length and first and last index"""
    # Extract some info
    idx_values = df.index.values
    first_idx, last_idx = idx_values[0], idx_values[-1]
    length = len(df)
    pid = os.getpid()

    # Waste some CPU cycles
    time.sleep(1)

    # Print the info
    print('First idx , last idx  and len  '
          'from process '.format(first_idx, last_idx, length, pid))


def df_chunking(df, chunksize):
    """Splits df into chunks, drops data of original df inplace"""
    count = 0 # Counter for chunks
    while len(df):
        count += 1
        print('Preparing chunk '.format(count))
        # Return df chunk
        yield df.iloc[:chunksize].copy()
        # Delete data in place because it is no longer needed
        df.drop(df.index[:chunksize], inplace=True)


def main():
    # Job parameters
    n_jobs = 4  # Poolsize
    size = (10000, 1000)  # Size of DataFrame
    chunksize = 100  # Maximum size of Frame Chunk

    # Preparation
    df = pd.DataFrame(np.random.rand(*size))
    pool = mp.Pool(n_jobs)

    print('Starting MP')

    # Execute the wait and print function in parallel
    pool.imap(just_wait_and_print_len_and_idx, df_chunking(df, chunksize))

    pool.close()
    pool.join()

    print('DONE')


if __name__ == '__main__':
    main()

【问题讨论】:

有点旧 - 但仍然有效:***.com/questions/10369219/… 基本上 - 你看到的 :) 可能不是“真的”; 好的,谢谢,这可能解释了它^^ 我必须收回这一点,如果我使用所有 8 个内核(在我的实际问题中,几十 GB,父进程需要大约 22% 的 RAM,子进程也是如此)在某些情况下点所有的子进程吞下所有的内存,整个事情都爆炸了。如果我只使用 4 个核心,它需要两倍的时间,但会成功并且不会崩溃。所以虚拟内存确实会转化为物理内存:-( @SmCaterpillar 我一直在密切关注您的示例。甚至整个变薄了主要DF的使用部分。但在我的情况下,消除使用的行一次只会将 DF 减少 40 行。此外,我无法像您那样利用块,因为必须以自定义方式对 DF 进行分块。很高兴得到您的想法:***.com/questions/62545562/… 【参考方案1】:

好的,所以我在 cmets 中 Sebastian Opałczyński 的提示下想通了。

问题是子进程是从父进程派生的,所以它们都包含对原始DataFrame 的引用。但是,帧是在原始进程中被操纵的,所以 copy-on-write 行为会在达到物理内存的限制时缓慢地杀死整个事物。

有一个简单的解决方案:代替pool = mp.Pool(n_jobs),我使用multiprocessing 的新上下文功能:

ctx = mp.get_context('spawn')
pool = ctx.Pool(n_jobs)

这保证了Pool 进程只是派生出来的,而不是从父进程派生出来的。因此,它们都无法访问原始的DataFrame,它们都只需要父级内存的一小部分。

请注意,mp.get_context('spawn') 仅适用于 Python 3.4 及更高版本。

【讨论】:

非常有趣的问题!!这适用于 windows 或 linux 还是两者都适用?!我想到的另一种解决方案是将数据帧拆分为块,将它们放入列表中(以提供map)并在调用multiprocessing之前从内存中删除原始数据帧。在您看来,这可行吗?【参考方案2】:

更好的实现是使用分块数据帧的 pandas 实现作为生成器并将其输入“pool.imap”函数 pd.read_csv('<filepath>.csv', chucksize=<chunksize>) https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_csv.html

好处:它不会在您的主进程中读取整个 df(节省内存)。每个子进程都将指向它只需要的块。 --> 解决子记忆问题。

开销:它要求您先将 df 保存为 csv,然后使用 pd.read_csv 再次读取它 --> I/O 时间。

注意:chunksize 不适用于pd.read_pickle 或其他在存储上压缩的加载方法。

def main():
    # Job parameters
    n_jobs = 4  # Poolsize
    size = (10000, 1000)  # Size of DataFrame
    chunksize = 100  # Maximum size of Frame Chunk

    # Preparation
    df = pd.DataFrame(np.random.rand(*size))
    pool = mp.Pool(n_jobs)

    print('Starting MP')

    # Execute the wait and print function in parallel

    df_chunked = pd.read_csv('<filepath>.csv',chunksize = chunksize) # modified
    pool.imap(just_wait_and_print_len_and_idx, df_chunking(df, df_chunked) # modified

    pool.close()
    pool.join()

    print('DONE')

【讨论】:

以上是关于Pandas 和多处理内存管理:将 DataFrame 拆分为多个块的主要内容,如果未能解决你的问题,请参考以下文章

“未指定驱动程序名称”将 pandas 数据帧写入 SQL Server 表

使用 dask 和多处理优化内存使用

python大数据处理模块pandas

将函数应用于 Pandas.DataFrame 中两列的每个组合的更好方法

Pandas:将 Lambda 应用于多个数据帧

Python数据分析 Pandas模块 基础数据结构与简介