有效地将函数并行应用于分组的 pandas DataFrame

Posted

技术标签:

【中文标题】有效地将函数并行应用于分组的 pandas DataFrame【英文标题】:Efficiently applying a function to a grouped pandas DataFrame in parallel 【发布时间】:2012-07-28 13:17:51 【问题描述】:

我经常需要对非常大的DataFrame(混合数据类型)的组应用一个函数,并希望利用多个内核。

我可以从组中创建一个迭代器并使用多处理模块,但它效率不高,因为每个组和函数的结果都必须为进程之间的消息传递进行腌制。

有什么方法可以避免酸洗甚至完全避免复制DataFrame?看起来多处理模块的共享内存功能仅限于numpy 数组。还有其他选择吗?

【问题讨论】:

据我所知,没有办法共享任意对象。我想知道,如果酸洗需要更多的时间,而不是通过多处理获得。也许您应该寻找一种可能性,为每个流程创建更大的工作包,以减少相对酸洗时间。另一种可能性是在创建组时使用多处理。 我做了类似的事情,但使用 UWSGI、Flask 和 preforking:我将 pandas 数据帧加载到一个进程中,将其分叉 x 次(使其成为共享内存对象),然后从另一个 python 调用这些进程我连接结果的过程。 atm 我使用 JSON 作为通信过程,但这即将到来(但仍然是高度实验性的):pandas.pydata.org/pandas-docs/dev/io.html#msgpack-experimental 顺便问一下,你有没有看过带有分块的 HDF5? (HDF5 不保存并发写入,但您也可以保存到单独的文件并最终连接内容) 这将针对 0.14,请参阅此问题:github.com/pydata/pandas/issues/5751 @Jeff 被推到 0.15 =( 【参考方案1】:

从上面的 cmets 看来,这似乎是为 pandas 计划的一段时间(我刚刚注意到还有一个看起来很有趣的 rosetta project)。

但是,在将每个并行功能都合并到 pandas 之前,我注意到直接使用 cython + OpenMP 和 C++ 向 pandas 编写高效且非内存复制的并行扩充非常容易。

这是一个编写并行 groupby-sum 的简短示例,其用法如下:

import pandas as pd
import para_group_demo

df = pd.DataFrame('a': [1, 2, 1, 2, 1, 1, 0], 'b': range(7))
print para_group_demo.sum(df.a, df.b)

输出是:

     sum
key     
0      6
1      11
2      4

注意 毫无疑问,这个简单示例的功能最终将成为pandas 的一部分。然而,有些事情在 C++ 中并行化一段时间会更自然,重要的是要知道将其组合到 pandas 中是多么容易。


为此,我编写了一个简单的单源文件扩展,其代码如下。

从一些导入和类型定义开始

from libc.stdint cimport int64_t, uint64_t
from libcpp.vector cimport vector
from libcpp.unordered_map cimport unordered_map

cimport cython
from cython.operator cimport dereference as deref, preincrement as inc
from cython.parallel import prange

import pandas as pd

ctypedef unordered_map[int64_t, uint64_t] counts_t
ctypedef unordered_map[int64_t, uint64_t].iterator counts_it_t
ctypedef vector[counts_t] counts_vec_t

C++ unordered_map 类型用于单线程求和,vector 用于所有线程求和。

现在到函数sum。它以typed memory views 开头以便快速访问:

def sum(crit, vals):
    cdef int64_t[:] crit_view = crit.values
    cdef int64_t[:] vals_view = vals.values

函数继续将半等分到线程(这里硬编码为 4),并让每个线程将其范围内的条目相加:

    cdef uint64_t num_threads = 4
    cdef uint64_t l = len(crit)
    cdef uint64_t s = l / num_threads + 1
    cdef uint64_t i, j, e
    cdef counts_vec_t counts
    counts = counts_vec_t(num_threads)
    counts.resize(num_threads)
    with cython.boundscheck(False):
        for i in prange(num_threads, nogil=True): 
            j = i * s
            e = j + s
            if e > l:
                e = l
            while j < e:
                counts[i][crit_view[j]] += vals_view[j]
                inc(j)

线程完成后,函数将所有结果(来自不同范围)合并到一个unordered_map

    cdef counts_t total
    cdef counts_it_t it, e_it
    for i in range(num_threads):
        it = counts[i].begin()
        e_it = counts[i].end()
        while it != e_it:
            total[deref(it).first] += deref(it).second
            inc(it)        

剩下的就是创建一个DataFrame 并返回结果:

    key, sum_ = [], []
    it = total.begin()
    e_it = total.end()
    while it != e_it:
        key.append(deref(it).first)
        sum_.append(deref(it).second)
        inc(it)

    df = pd.DataFrame('key': key, 'sum': sum_)
    df.set_index('key', inplace=True)
    return df

【讨论】:

以上是关于有效地将函数并行应用于分组的 pandas DataFrame的主要内容,如果未能解决你的问题,请参考以下文章

熊猫有效地将groupby函数应用于每一列[重复]

如何有效地迭代 Pandas 数据帧的连续块

从 Numpy 3d 数组有效地创建 Pandas DataFrame

将多个过滤器应用于 pandas DataFrame 或 Series 的有效方法

有效地将Pandas数据帧写入Google BigQuery

如何将 pandas get_dummies 函数应用于有效数据集?