Python:在熊猫数据帧上使用多处理

Posted

技术标签:

【中文标题】Python:在熊猫数据帧上使用多处理【英文标题】:Python: using multiprocessing on a pandas dataframe 【发布时间】:2016-08-16 02:53:36 【问题描述】:

我想在大型数据集上使用multiprocessing 来查找两个 gps 点之间的距离。我构建了一个测试集,但我无法让multiprocessing 处理这个集。

import pandas as pd
from geopy.distance import vincenty
from itertools import combinations
import multiprocessing as mp

df = pd.DataFrame('ser_no': [1, 2, 3, 4, 5, 6, 7, 8, 9, 0],
                'co_nm': ['aa', 'aa', 'aa', 'bb', 'bb', 'bb', 'bb', 'cc', 'cc', 'cc'],
                'lat': [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
                'lon': [21, 22, 23, 24, 25, 26, 27, 28, 29, 30])



def calc_dist(x):
    return pd.DataFrame(
               [ [grp,
                  df.loc[c[0]].ser_no,
                  df.loc[c[1]].ser_no,
                  vincenty(df.loc[c[0], x], 
                           df.loc[c[1], x])
                 ]
                 for grp,lst in df.groupby('co_nm').groups.items()
                 for c in combinations(lst, 2)
               ],
               columns=['co_nm','machineA','machineB','distance'])

if __name__ == '__main__':
    pool = mp.Pool(processes = (mp.cpu_count() - 1))
    pool.map(calc_dist, ['lat','lon'])
    pool.close()
    pool.join()

发生此错误时,我在 Windows7 Professional 上使用 Python 2.7.11 和 Ipython 4.1.2 和 Anaconda 2.5.0 64 位。

runfile('C:/.../Desktop/multiprocessing test.py', wdir='C:/.../Desktop') Traceback(最近一次调用最后一次):

文件“”,第 1 行,在 runfile('C:/.../Desktop/multiprocessing test.py', wdir='C:/.../Desktop')

文件“C:...\Local\Continuum\Anaconda2\lib\site-packages\spyderlib\widgets\externalshell\sitecustomize.py”,第 699 行,在运行文件中 execfile(文件名,命名空间)

文件“C:...\Local\Continuum\Anaconda2\lib\site-packages\spyderlib\widgets\externalshell\sitecustomize.py”,第 74 行,在 execfile exec(compile(scripttext, filename, 'exec'), glob, loc)

文件“C:/..../multiprocessing test.py”,第 33 行,在 pool.map(calc_dist, ['lat','lon'])

文件“C:...\AppData\Local\Continuum\Anaconda2\lib\multiprocessing\pool.py”,第 251 行,在地图中 return self.map_async(func, iterable, chunksize).get()

文件“C:...\Local\Continuum\Anaconda2\lib\multiprocessing\pool.py”,第 567 行,在 get 提高self._value

TypeError: 无法从 1 创建 Point 实例。

def get(self, timeout=None):
    self.wait(timeout)
    if not self._ready:
        raise TimeoutError
    if self._success:
        return self._value
    else:
        raise self._value

【问题讨论】:

通过您的最后一次编辑,它在我的机器上运行良好... Ubuntu 14.04,python2.7 错误出现在哪一行? 您能否在您的问题中也发布完整的堆栈跟踪? 我用 python3 试过了,现在我可以重现你的错误 @salomonderossi 我正在使用 iPython 4.1.2 【参考方案1】:

怎么了

您的代码中的这一行:

pool.map(calc_dist, ['lat','lon'])

产生 2 个进程 - 一个运行 calc_dist('lat'),另一个运行 calc_dist('lon')。比较doc 中的第一个示例。 (基本上,pool.map(f, [1,2,3]) 调用 f 三次,参数在以下列表中:f(1)f(2)f(3)。)如果我没记错的话,你的函数 calc_dist 只能是叫calc_dist('lat', 'lon')。而且它不允许并行处理。

解决方案

我相信您想在进程之间拆分工作,可能会将每个元组 (grp, lst) 发送到单独的进程。下面的代码正是这样做的。

首先,让我们准备拆分:

grp_lst_args = list(df.groupby('co_nm').groups.items())

print(grp_lst_args)
[('aa', [0, 1, 2]), ('cc', [7, 8, 9]), ('bb', [3, 4, 5, 6])]

我们将把这些元组中的每一个(这里是三个)作为参数发送给一个单独进程中的函数。我们需要重写函数,我们称之为calc_dist2。为方便起见,它的参数是一个元组,如calc_dist2(('aa',[0,1,2]))

def calc_dist2(arg):
    grp, lst = arg
    return pd.DataFrame(
               [ [grp,
                  df.loc[c[0]].ser_no,
                  df.loc[c[1]].ser_no,
                  vincenty(df.loc[c[0], ['lat','lon']], 
                           df.loc[c[1], ['lat','lon']])
                 ]
                 for c in combinations(lst, 2)
               ],
               columns=['co_nm','machineA','machineB','distance'])

现在是多处理:

pool = mp.Pool(processes = (mp.cpu_count() - 1))
results = pool.map(calc_dist2, grp_lst_args)
pool.close()
pool.join()

results_df = pd.concat(results)

results 是在grp_lst_args 中为(grp,lst) 调用calc_dist2((grp,lst)) 的结果列表(此处为数据帧)。 results 的元素稍后会连接到一个数据帧。

print(results_df)
  co_nm  machineA  machineB          distance
0    aa         1         2  156.876149391 km
1    aa         1         3  313.705445447 km
2    aa         2         3  156.829329105 km
0    cc         8         9  156.060165391 km
1    cc         8         0  311.910998169 km
2    cc         9         0  155.851498134 km
0    bb         4         5  156.665641837 km
1    bb         4         6  313.214333025 km
2    bb         4         7  469.622535339 km
3    bb         5         6  156.548897414 km
4    bb         5         7  312.957597466 km
5    bb         6         7   156.40899677 km

顺便说一句,在 Python 3 中,我们可以使用 with 构造:

with mp.Pool() as pool:
    results = pool.map(calc_dist2, grp_lst_args)

更新

我只在 linux 上测试了这段代码。在 linux 上,只读数据帧df 可以被子进程访问,并且不会被复制到它们的内存空间,但我不确定它在 Windows 上是如何工作的。您可以考虑将df 拆分成块(按co_nm 分组)并将这些块作为参数发送给其他版本的calc_dist

【讨论】:

【参考方案2】:

我编写了一个包,用于在多个核心上的 Series、DataFrames 和 GroupByDataFrames 上使用 apply 方法。它使得在 Pandas 中进行多处理变得非常容易。

您可以在https://github.com/akhtarshahnawaz/multiprocesspandas查看文档

也可以直接使用pip安装包

pip install multiprocesspandas

那么做多处理就跟导入包一样简单

from multiprocesspandas import applyparallel

然后使用 applyparallel 而不是 apply like

def func(x):
    import pandas as pd
    return pd.Series([x['C'].mean()])

df.groupby(["A","B"]).apply_parallel(func, num_processes=30)

【讨论】:

这个包在我的 MacBook 上就像一个魅力。谢谢。【参考方案3】:

奇怪。它似乎在 python2 下工作,但在 python3 下不工作。

这是打印输出的最小修改版本:

import pandas as pd
from geopy.distance import vincenty
from itertools import combinations
import multiprocessing as mp

df = pd.DataFrame('ser_no': [1, 2, 3, 4, 5, 6, 7, 8, 9, 0],
                'co_nm': ['aa', 'aa', 'aa', 'bb', 'bb', 'bb', 'bb', 'cc', 'cc', 'cc'],
                'lat': [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
                'lon': [21, 22, 23, 24, 25, 26, 27, 28, 29, 30])



def calc_dist(x):
    ret =  pd.DataFrame(
               [ [grp,
                  df.loc[c[0]].ser_no,
                  df.loc[c[1]].ser_no,
                  vincenty(df.loc[c[0], x],
                           df.loc[c[1], x])
                 ]
                 for grp,lst in df.groupby('co_nm').groups.items()
                 for c in combinations(lst, 2)
               ],
               columns=['co_nm','machineA','machineB','distance'])
    print(ret)
    return ret

if __name__ == '__main__':
    pool = mp.Pool(processes = (mp.cpu_count() - 1))
    pool.map(calc_dist, ['lat','lon'])
    pool.close()
    pool.join()

这是python2的输出

0     aa         1         2  110.723608682 km
1     aa         1         3  221.460709525 km
2     aa         2         3  110.737100843 km
3     cc         8         9  110.827576495 km
4     cc         8         0  221.671650552 km
   co_nm  machineA  machineB          distance
5     cc         9         0  110.844074057 km
0     aa         1         2  110.575064814 km
1     aa         1         3  221.151481337 km
6     bb         4         5  110.765515243 km
2     aa         2         3  110.576416524 km
7     bb         4         6    221.5459187 km
3     cc         8         9  110.598565514 km
4     cc         8         0  221.203121352 km
8     bb         4         7  332.341640771 km
5     cc         9         0  110.604555838 km
6     bb         4         5   110.58113908 km
9     bb         5         6  110.780403457 km
7     bb         4         6  221.165643396 km
10    bb         5         7  221.576125528 km
8     bb         4         7  331.754177186 km
9     bb         5         6  110.584504316 km
10    bb         5         7  221.173038106 km
11    bb         6         7  110.795722071 km
11    bb         6         7   110.58853379 km

这是来自 python3 的堆栈跟踪

"""
Traceback (most recent call last):
  File "/usr/local/lib/python3.4/dist-packages/geopy/point.py", line 123, in __new__
    seq = iter(arg)
TypeError: 'numpy.int64' object is not iterable

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/lib/python3.4/multiprocessing/pool.py", line 119, in worker
    result = (True, func(*args, **kwds))
  File "/usr/lib/python3.4/multiprocessing/pool.py", line 44, in mapstar
    return list(map(*args))
  File "gps.py", line 29, in calc_dist
    for grp, lst in df.groupby('co_nm').groups.items()
  File "gps.py", line 30, in <listcomp>
    for c in combinations(lst, 2)
  File "/usr/local/lib/python3.4/dist-packages/geopy/distance.py", line 322, in __init__
    super(vincenty, self).__init__(*args, **kwargs)
  File "/usr/local/lib/python3.4/dist-packages/geopy/distance.py", line 115, in __init__
    kilometers += self.measure(a, b)
  File "/usr/local/lib/python3.4/dist-packages/geopy/distance.py", line 342, in measure
    a, b = Point(a), Point(b)
  File "/usr/local/lib/python3.4/dist-packages/geopy/point.py", line 126, in __new__
    "Failed to create Point instance from %r." % (arg,)
TypeError: Failed to create Point instance from 8.
"""

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "gps.py", line 38, in <module>
    pool.map(calc_dist, ['lat', 'lon'])
  File "/usr/lib/python3.4/multiprocessing/pool.py", line 260, in map
    return self._map_async(func, iterable, mapstar, chunksize).get()
  File "/usr/lib/python3.4/multiprocessing/pool.py", line 599, in get
    raise self._value
TypeError: Failed to create Point instance from 8.

我知道这不是答案,但也许它有帮助......

【讨论】:

它在 Python2、Python3、Ipython 4.1.2 中对我不起作用 pool.py 中的第 599 行是与第 567 行不同的问题,我在 Python 2.7.11 和 IPython 4.1.2 中得到了

以上是关于Python:在熊猫数据帧上使用多处理的主要内容,如果未能解决你的问题,请参考以下文章

为啥在数据帧上具有中位数的 fillna 仍然在熊猫中留下 Na/NaN?

熊猫逐渐减去日期,直到满足数据帧上的条件

在多个熊猫数据帧上执行相同操作的正确方法是什么?

在熊猫数据框中按行应用时如何保留数据类型?

通过获取特定列在数据帧上使用循环

Python 多处理管理器在烧瓶 API 中使用时显示错误