如何在 Pandas 中使用 apply 并行化许多(模糊)字符串比较?

Posted

技术标签:

【中文标题】如何在 Pandas 中使用 apply 并行化许多(模糊)字符串比较?【英文标题】:how to parallelize many (fuzzy) string comparisons using apply in Pandas? 【发布时间】:2016-10-25 01:16:00 【问题描述】:

我有以下问题

我有一个包含句子的数据框 ma​​ster,例如

master
Out[8]: 
                  original
0  this is a nice sentence
1      this is another one
2    *** is nice

对于 Master 中的每一行,我使用 fuzzywuzzy 查找另一个 Dataframe slave 以获得最佳匹配。我使用了fuzzywuzzy,因为两个数据帧之间的匹配句子可能会有所不同(额外的字符等)。

例如,从属可以是

slave
Out[10]: 
   my_value                      name
0         2               hello world
1         1           congratulations
2         2  this is a nice sentence 
3         3       this is another one
4         1     *** is nice

这是一个功能齐全、精彩、紧凑的工作示例:)

from fuzzywuzzy import fuzz
import pandas as pd
import numpy as np
import difflib


master= pd.DataFrame('original':['this is a nice sentence',
'this is another one',
'*** is nice'])


slave= pd.DataFrame('name':['hello world',
'congratulations',
'this is a nice sentence ',
'this is another one',
'*** is nice'],'my_value': [2,1,2,3,1])

def fuzzy_score(str1, str2):
    return fuzz.token_set_ratio(str1, str2)

def helper(orig_string, slave_df):
    #use fuzzywuzzy to see how close original and name are
    slave_df['score'] = slave_df.name.apply(lambda x: fuzzy_score(x,orig_string))
    #return my_value corresponding to the highest score
    return slave_df.ix[slave_df.score.idxmax(),'my_value']

master['my_value'] = master.original.apply(lambda x: helper(x,slave))

100 万美元的问题是:我可以并行化上面的应用代码吗?

毕竟,master 中的每一行都与slave 中的所有行进行比较(从属数据集是一个小数据集,我可以将许多数据副本保存到 RAM 中)。

我不明白为什么我不能运行多重比较(即同时处理多行)。

问题:我不知道该怎么做,或者这是否可能。

非常感谢任何帮助!

【问题讨论】:

我注意到您在此处添加了 dask 标签。您是否尝试过使用 dask 并遇到问题? 感谢您的帮助!看来 dask 只接受常规功能 Dask 使用 cloudpickle 序列化函数,因此可以轻松处理 lambdas 和对其他数据集的闭包。 差不多,但我会使用assign 而不是列分配,我会向apply 提供有关您期望的列的元数据。如果您创建一个最小的可重现示例,那么提供明确的解决方案会更容易。例如,我可以复制粘贴到本地机器上。 让我们continue this discussion in chat。 【参考方案1】:

这些答案基于较旧的 API。一些较新的代码:

dmaster = dd.from_pandas(master, npartitions=4)
dmaster['my_value'] = dmaster.original.apply(lambda x: helper(x, slave),meta=('x','f8'))
dmaster.compute(scheduler='processes') 

我个人会放弃在辅助函数中应用对fuzzy_score 的调用,而只是在那里执行操作。

您可以使用these tips 更改调度程序。

【讨论】:

【参考方案2】:

我正在研究类似的东西,我想为您可能偶然发现这个问题的其他人提供更完整的工作解决方案。不幸的是,@MRocklin 在提供的代码 sn-ps 中有一些语法错误。我不是 Dask 的专家,所以我不能评论一些性能考虑,但这应该可以完成你的任务,就像@MRocklin 所建议的那样。这是使用 Dask 版本 0.17.2Pandas 版本 0.22.0

import dask.dataframe as dd
import dask.multiprocessing
import dask.threaded
from fuzzywuzzy import fuzz
import pandas as pd

master= pd.DataFrame('original':['this is a nice sentence',
'this is another one',
'*** is nice'])

slave= pd.DataFrame('name':['hello world',
'congratulations',
'this is a nice sentence ',
'this is another one',
'*** is nice'],'my_value': [1,2,3,4,5])

def fuzzy_score(str1, str2):
    return fuzz.token_set_ratio(str1, str2)

def helper(orig_string, slave_df):
    slave_df['score'] = slave_df.name.apply(lambda x: fuzzy_score(x,orig_string))
    #return my_value corresponding to the highest score
    return slave_df.loc[slave_df.score.idxmax(),'my_value']

dmaster = dd.from_pandas(master, npartitions=4)
dmaster['my_value'] = dmaster.original.apply(lambda x: helper(x, slave),meta=('x','f8'))

然后,获取你的结果(就像在这个解释器会话中一样):

In [6]: dmaster.compute(get=dask.multiprocessing.get)                                             
Out[6]:                                          
                  original  my_value             
0  this is a nice sentence         3             
1      this is another one         4             
2    *** is nice         5    

【讨论】:

【参考方案3】:

您可以将其与 Dask.dataframe 并行化。

>>> dmaster = dd.from_pandas(master, npartitions=4)
>>> dmaster['my_value'] = dmaster.original.apply(lambda x: helper(x, slave), name='my_value'))
>>> dmaster.compute()
                  original  my_value
0  this is a nice sentence         2
1      this is another one         3
2    *** is nice         1

此外,您应该考虑在此处使用线程与进程之间的权衡。您的模糊字符串匹配几乎肯定不会释放 GIL,因此您不会从使用多个线程中获得任何好处。但是,使用进程会导致数据序列化并在您的机器上移动,这可能会减慢速度。

您可以通过管理compute() 方法的get= 关键字参数来试验使用线程和进程或分布式系统。

import dask.multiprocessing
import dask.threaded

>>> dmaster.compute(get=dask.threaded.get)  # this is default for dask.dataframe
>>> dmaster.compute(get=dask.multiprocessing.get)  # try processes instead

【讨论】:

天才!只是一个简单的问题:我有一台 8 核至强机器,它可以工作吗?我不能按照你的建议使用分布式系统 多处理将加速您的计算,但会因进程间数据传输而减慢。我无法知道事情是否会加快速度,而我对您的问题的了解比我真正想了解的更多。我建议尝试一下并进行分析。 感谢@MRocklin!我相信很多人会发现这篇文章很有用。在浏览dask.pydata.org/en/latest/install.html 之后,我自己仍然对dask 一无所知 如果还有 20 秒,请跟进。我也应该和npartitions一起玩吗? 我有很多 RAM (128GB),所以我应该使用很多 npartitions 吗?

以上是关于如何在 Pandas 中使用 apply 并行化许多(模糊)字符串比较?的主要内容,如果未能解决你的问题,请参考以下文章

如何在 Pandas 中使用 apply 并行化许多(模糊)字符串比较?

如何在 Pandas 中使用 apply 并行化许多(模糊)字符串比较?

如何在 Pandas 中使用 apply 并行化许多(模糊)字符串比较?

让 Pandas DataFrame apply() 使用所有内核?

在 pandas groupby 之后并行化应用

如何在 pandas 中使用 apply 函数来实现这个 iterrow 案例?