使用 Python 对大数据集进行模糊逻辑

Posted

技术标签:

【中文标题】使用 Python 对大数据集进行模糊逻辑【英文标题】:Fuzzy logic on big datasets using Python 【发布时间】:2015-06-19 05:05:25 【问题描述】:

我的团队一直坚持在两个大型数据集上运行模糊逻辑算法。 第一个(子集)大约有 180K 行,其中包含我们需要在第二个(超集)中匹配的人员的姓名、地址和电子邮件。超集包含 250 万条记录。两者具有相同的结构,并且数据已经被清理,即解析地址、标准化名称等。

ContactID 整数, 全名 varchar(150), 地址 varchar(100), 电子邮件 varchar(100)

目标是将子集中的行中的值与超集中的对应值相匹配,因此输出将结合子集和超集以及每个字段(令牌)的对应相似性百分比。

联系人 ID, 查找联系人 ID, 全名, 查找全名, FullName_Similarity, 地址, 查找地址, Address_Similarity, 电子邮件, 查找电子邮件, Email_Similarity

为了首先简化和测试代码,我们将字符串连接起来,并且我们知道代码适用于非常小的超集;但是,一旦我们增加记录的数量,它就会卡住。我们尝试了不同的算法,Levenshtein、FuzzyWuzzy 等,但均无济于事。在我看来,问题在于 Python 是逐行执行的。但是,我不确定。我们甚至尝试使用流在我们的 Hadoop 集群上运行它;但是,它并没有产生任何积极的结果。

#!/usr/bin/env python
import sys
from fuzzywuzzy import fuzz
import datetime
import time
import Levenshtein

#init for comparison
with open('normalized_set_record_set.csv') as normalized_records_ALL_file:
# with open('delete_this/xab') as normalized_records_ALL_file:
    normalized_records_ALL_dict = 
    for line in normalized_records_ALL_file:
        key, value = line.strip('\n').split(':', 1)
        normalized_records_ALL_dict[key] = value
        # normalized_records_ALL_dict[contact_id] = concat_record

def score_it_bag(target_contact_id, target_str, ALL_records_dict):
    '''
    INPUT target_str, ALL_records_dict
    OUTPUT sorted list by highest fuzzy match
    '''
    return sorted([(value_str, contact_id_index_str, fuzz.ratio(target_str, value_str)) 
        for contact_id_index_str, value_str in ALL_records_dict.iteritems()], key=lambda x:x[2])[::-1]

def score_it_closest_match_pandas(target_contact_id, target_str, place_holder_delete):
    '''
    INPUT target_str, ALL_records_dict
    OUTPUT closest match
    '''
    # simply drop this index target_contact_id
    df_score = df_ALL.concat_record.apply(lambda x: fuzz.ratio(target_str, x))

    return df_ALL.concat_record[df_score.idxmax()], df_score.max(), df_score.idxmax()

def score_it_closest_match_L(target_contact_id, target_str, ALL_records_dict_input):
    '''
    INPUT target_str, ALL_records_dict
    OUTPUT closest match tuple (best matching str, score, contact_id of best match str)
    '''
    best_score = 100

    #score it
    for comparison_contactid, comparison_record_str in ALL_records_dict_input.iteritems():
        if target_contact_id != comparison_contactid:
            current_score = Levenshtein.distance(target_str, comparison_record_str)


            if current_score < best_score:
                best_score = current_score 
                best_match_id = comparison_contactid 
                best_match_str = comparison_record_str 

    return (best_match_str, best_score, best_match_id)



def score_it_closest_match_fuzz(target_contact_id, target_str, ALL_records_dict_input):
    '''
    INPUT target_str, ALL_records_dict
    OUTPUT closest match tuple (best matching str, score, contact_id of best match str)
    '''
    best_score = 0

    #score it
    for comparison_contactid, comparison_record_str in ALL_records_dict_input.iteritems():
        if target_contact_id != comparison_contactid:
            current_score = fuzz.ratio(target_str, comparison_record_str)

            if current_score > best_score:
                best_score = current_score 
                best_match_id = comparison_contactid 
                best_match_str = comparison_record_str 

    return (best_match_str, best_score, best_match_id)

def score_it_threshold_match(target_contact_id, target_str, ALL_records_dict_input):
    '''
    INPUT target_str, ALL_records_dict
    OUTPUT closest match tuple (best matching str, score, contact_id of best match str)
    '''
    score_threshold = 95

    #score it
    for comparison_contactid, comparison_record_str in ALL_records_dict_input.iteritems():
        if target_contact_id != comparison_contactid:
            current_score = fuzz.ratio(target_str, comparison_record_str)

            if current_score > score_threshold: 
                return (comparison_record_str, current_score, comparison_contactid)

    return (None, None, None)


def score_it_closest_match_threshold_bag(target_contact_id, target_str, ALL_records_dict):
    '''
    INPUT target_str, ALL_records_dict
    OUTPUT closest match
    '''
    threshold_score = 80
    top_matches_list = []
    #score it
    #iterate through dictionary
    for comparison_contactid, comparison_record_str in ALL_records_dict.iteritems():
        if target_contact_id != comparison_contactid:
            current_score = fuzz.ratio(target_str, comparison_record_str)

            if current_score > threshold_score:
                top_matches_list.append((comparison_record_str, current_score, comparison_contactid))


    if len(top_matches_list) > 0:  return top_matches_list

def score_it_closest_match_threshold_bag_print(target_contact_id, target_str, ALL_records_dict):
    '''
    INPUT target_str, ALL_records_dict
    OUTPUT closest match
    '''
    threshold_score = 80


    #iterate through dictionary
    for comparison_contactid, comparison_record_str in ALL_records_dict.iteritems():
        if target_contact_id != comparison_contactid:

            #score it
            current_score = fuzz.ratio(target_str, comparison_record_str)
            if current_score > threshold_score:
                print target_contact_id + ':' + str((target_str,comparison_record_str, current_score, comparison_contactid))


    pass


#stream in all contacts ie large set
for line in sys.stdin:
    # ERROR DIAG TOOL
    ts = time.time()
    st = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S')
    print >> sys.stderr, line, st

    contact_id, target_str = line.strip().split(':', 1)

    score_it_closest_match_threshold_bag_print(contact_id, target_str, normalized_records_ALL_dict)
    # output = (target_str, score_it_closest_match_fuzz(contact_id, target_str, normalized_records_ALL_dict))
    # output = (target_str, score_it_closest_match_threshold_bag(contact_id, target_str, normalized_records_ALL_dict))
    # print contact_id + ':' + str(output)

【问题讨论】:

您好 Pasha,您找到解决方案了吗?我也有同样的情况。 嗨,@SCool。它是很久以前的事了,但我们最终使用了 SSIS 模糊匹配,我相信它使用了基于 Q-Gram 的算法 【参考方案1】:

您的方法要求您进行 180,000 * 2,500,000 = 450,000,000,000 次比较。

4500 亿是很多。

要减少比较次数,您可以首先对具有某些共同特征的记录进行分组,例如地址字段的前五个字符或共同标记。然后,仅比较具有相同特征的记录。这种想法称为“阻止”,通常会减少您必须对可管理的事物进行的总比较次数。

您尝试解决的一般问题称为“record linkage”。由于您使用的是 python,因此您可能想查看dedupe library,它提供了一种全面的方法(我是这个库的作者)。

【讨论】:

谢谢!我已经阅读了 dedupe 库,但我认为它提供了交互式训练数据的机制 - 我已经获得了训练数据怎么样?有一个 500,000 个公司名称数据库与之匹配; 10,000 个公司名称是否在此数据库中标记;是否预测该数据库中的其他 10,000 个公司名称(如果是,则返回匹配值)

以上是关于使用 Python 对大数据集进行模糊逻辑的主要内容,如果未能解决你的问题,请参考以下文章

对大数据集进行聚类(定量/定性值)

基于条件的 2 个大型数据集上的模糊模糊字符串匹配 - python

基于 Iris 数据集的 Python 模糊聚类

我们可以在 python 中使用逻辑回归预测数据集的未来值吗?

Django 数据库操作

Python 记录链接、模糊匹配和去重