Python:在字符串列表中最佳搜索子字符串

Posted

技术标签:

【中文标题】Python:在字符串列表中最佳搜索子字符串【英文标题】:Python: optimal search for substring in list of strings 【发布时间】:2016-04-21 09:12:40 【问题描述】:

我有一个特殊的问题,我想在许多字符串的列表中搜索许多子字符串。以下是我想要做的事情的要点:

listStrings = [ACDE, CDDE, BPLL, ... ]

listSubstrings = [ACD, BPI, KLJ, ...]

以上条目只是示例。 len(listStrings) 约为 60,000,len(listSubstrings) 约为 50,000-300,000,len(listStrings[i]) 介于 10 到 30,000 之间。

我目前的 Python 尝试是:

for i in listSubstrings:
   for j in listStrings:
       if i in j:
          w.write(i+j)

或者类似的东西。虽然这适用于我的任务,但它非常慢,使用一个核心并需要 40 分钟才能完成任务。有没有办法加快速度?

我不相信我可以从 listStrings:listSubstrings 中创建一个字典,因为有可能需要在两端存储重复的条目(尽管如果我能找到一种方法来附加一个每个标签都有唯一的标签,因为 dicts 快得多)。同样,我认为我不能预先计算可能的子字符串。我什至不知道搜索 dict 键是否比搜索列表更快(因为 dict.get() 将提供特定输入而不是查找子输入)。相对而言,在内存中搜索列表有那么慢吗?

【问题讨论】:

您可能正在执行 300,000 x 30,000 = 9,000,000,000 in 测试。它一定会很慢。所以是的,这很正常,你需要一个更好的string search algorithm 还有更高效的算法,比如Aho-Corasick,但是用纯Python实现这些算法会很慢。您可以将 Cython 或 SWIG 之类的东西合并到您的项目中吗? (谷歌在 PyPI 上为 Aho-Corasick 展示了两个现有模块,虽然我不知道它们是否有任何好处。) 我可能会涉及到 Cython 之类的东西(我需要学习一些知识,但这毕竟是我来这里的目的),但我想我想保持低复杂度以备不时之需与同事共享此脚本。他们已经害怕 Python……但我假设 Cython 会让你更好地实现 Aho-Corasick 算法?自己尝试一下可能会很有趣 这真的不是一个答案(因为它不会取代一个有效的搜索算法),但你可以尝试if j.find(i) > -1: 而不是if i in j: 它有时可以更快一点.. @Alopex:我用示例代码更新了my answer,以应对 mgc 的困难;如果你还在做这样的工作,你可能想take a look。 【参考方案1】:

您可以通过使用内置列表函数来加快速度。

for i in listSubstrings:
   w.write(list(map(lambda j: i + j, list(lambda j: i in j,listStrings))))

从运行时间复杂度分析来看,最糟糕的情况似乎是 n^2 次比较,因为在给定当前问题结构的情况下,您需要遍历每个列表。您需要担心的另一个问题是内存消耗,因为随着规模的扩大,更多的内存通常是瓶颈。

正如您所说,您可能想要索引字符串列表。我们可以知道的子字符串列表或字符串列表是否有任何模式?例如,在您的示例中,我们可以索引哪些字符串具有字母表中的哪些字符 "A": ["ABC", "BAW", "CMAI"]...,因此我们不需要经过每个子字符串元素列表的字符串列表。

【讨论】:

感谢您的回复 - 我从办公室回家后会尝试使用列表功能。我也没有考虑过尝试找到一种模式将字符串列表分解为更小的组件......也许为“包含'A'”......“'包含B'”或类似的东西做一个单独的条目形成某种决策树。虽然没有任何明显的模式,但在我的例子中,字符串组合的可能性仅限于 20 个氨基酸。 尝试计算特定氨基酸的概率(#occurrances/#total),然后这可能有助于搜索。 一,这是bug;list(lambda j: i in j, listStrings)无效;我怀疑你的意思是filter?二、如果你需要lambda(或任何简短的Python级函数)来使用mapfilter,不要;它会比等效的 listcomp(在 Py2 上,map/filter 返回序列)或geneexpr(在 Py3 上,它们返回生成器对象)慢。您也不能在带有list 的类文件对象上调用write;他们在文本模式下采用单个 unicode (Py2) 或 str(任何版本),或在二进制模式下采用 bytes 类对象。 您的代码可以使用writelines 变得合法(并且更快),它采用unicode/str/str/bytes 的迭代器(根据模式)和genexpr(即将避免在 Py2 上出现大量不必要的临时 lists,并避免大量的函数调用开销)得到 for i in listSubstrings: w.writelines(i + j for j in listStrings if i in j)。您甚至可以将外部循环内联到genexpr 中,制作一个更快的geneexpr(感谢i 成为局部变量,而不是闭包范围):w.writelines(i + j for i in listSubstrings for j in listStrings if i in j)【参考方案2】:

你的子串长度都一样吗?您的示例使用 3 个字母的子字符串。在这种情况下,您可以创建一个包含 3 个字母子字符串的 dict 作为字符串列表的键:

index = 
for string in listStrings:
    for i in range(len(string)-2):
        substring = string[i:i+3]
        index_strings = index.get(substring, [])
        index_strings.append(string)
        index[substring] = index_strings

for substring in listSubstrings:
    index_strings = index.get(substring, [])
    for string in index_strings:
        w.write(substring+string)

【讨论】:

它们的随机长度在 ~4-15 左右之间。我的例子对此含糊不清【参考方案3】:

也许您可以尝试将两个列表中的一个(最大的?虽然直觉上我会削减listStrings)分成较小的列表,然后使用线程并行运行这些搜索(Pool class of multiprocessing 提供了一种方便的方法这) ?我使用类似的东西有一些显着的加速:

from multiprocessing import Pool
from itertools import chain, islice

# The function to be run in parallel :
def my_func(strings):
    return [j+i for i in strings for j in listSubstrings if i.find(j)>-1]

# A small recipe from itertools to chunk an iterable :
def chunk(it, size):
    it = iter(it)
    return iter(lambda: tuple(islice(it, size)), ())

# Generating some fake & random value :
from random import randint
listStrings = \
    [''.join([chr(randint(65, 90)) for i in range(randint(1, 500))]) for j in range(10000)]
listSubstrings = \
    [''.join([chr(randint(65, 90)) for i in range(randint(1, 100))]) for j in range(1000)]

# You have to prepare the searches to be performed:
prep = [strings for strings in chunk(listStrings, round(len(listStrings) / 8))]
with Pool(4) as mp_pool:
    # multiprocessing.map is a parallel version of map()
    res = mp_pool.map(my_func, prep)
# The `res` variable is a list of list, so now you concatenate them
# in order to have a flat result list
result = list(chain.from_iterable(res))

然后您可以编写整个 result 变量(而不是逐行编写):

with open('result_file', 'w') as f:
    f.write('\n'.join(result))

2018 年 1 月 5 日编辑:按照 ShadowRanger 的建议,使用 itertools.chain.from_iterable 将结果展平,而不是使用 map 副作用的丑陋解决方法。

【讨论】:

回复有点晚,但实现分块和多处理是一个相当简单的解决方案。现在只需约 2 分钟即可在我的数据集上运行脚本 旁注:代码的最后两行应该是result = list(itertools.chain.from_iterable(res))itertools.chain.from_iterable 是 the canonical, idiomatic, most efficient way to flatten one level of an existing iterable of iterables(另外,使用 map 的副作用会让 Guido van Rossum 哭泣)。我相信当大多数可迭代对象为空时,较旧的 Python 存在错误。 list(filter(None, itertools.chain.from_iterable(res))) 以低廉的成本解决了这个问题。 @ShadowRanger 是的,你显然是对的!我觉得这不是产生这个结果的最惯用的方式(根本!),但这就是我在回答时发现的全部!我将编辑我的答案以遵循您的建议!谢谢!【参考方案4】:

对于您正在尝试的那种事情(在一大堆其他字符串中搜索一组固定的一大堆字符串),并行化和微小的调整不会有太大帮助。您需要算法改进。

首先,我建议使用Aho-Corasick string matching algorithm。基本上,作为从您的一组固定字符串构建匹配器对象的一些预计算工作的交换,您可以一次扫描另一个字符串以查找 all 这些固定字符串。

因此,您无需每次扫描 60K 字符串 50K+ 次(30 亿次扫描?!?),您只需比普通单次扫描略高一点的成本,每次扫描一次,并获得所有命中。

最好的部分是,你不是自己写的。 PyPI(Python 包索引)已经为您编写了 pyahocorasick 包。所以试试吧。

使用示例:

import ahocorasick

listStrings = [ACDE, CDDE, BPLL, ...]
listSubstrings = [ACD, BPI, KLJ, ...]

auto = ahocorasick.Automaton()
for substr in listSubstrings:
    auto.add_word(substr, substr)
auto.make_automaton()

...

for astr in listStrings:
    for end_ind, found in auto.iter(astr):
        w.write(found+astr)

如果在被搜索的字符串(“haystack”)中多次找到子字符串(“needle”),这将write 多次。您可以更改循环,通过使用set 重复数据删除,使其仅在给定大海捞针中的给定针头第一次命中时使用write

for astr in listStrings:
    seen = set()
    for end_ind, found in auto.iter(astr):
        if found not in seen:
            seen.add(found)
            w.write(found+astr)

您可以进一步调整它,通过将单词的索引存储为它们的值或与它们的值一起以它们出现在 listSubstrings 中的相同顺序输出给定干草堆的针(并且在您使用时是唯一的),以便您可以对命中进行排序(可能是小数字,所以排序开销很小):

from future_builtins import map  # Only on Py2, for more efficient generator based map
from itertools import groupby
from operator import itemgetter

auto = ahocorasick.Automaton()
for i, substr in enumerate(listSubstrings):
    # Store index and substr so we can recover original ordering
    auto.add_word(substr, (i, substr))
auto.make_automaton()

...

for astr in listStrings:
    # Gets all hits, sorting by the index in listSubstrings, so we output hits
    # in the same order we theoretically searched for them
    allfound = sorted(map(itemgetter(1), auto.iter(astr)))
    # Using groupby dedups already sorted inputs cheaply; the map throws away
    # the index since we don't need it
    for found, _ in groupby(map(itemgetter(1), allfound)):
        w.write(found+astr)

为了进行性能比较,我使用了 mgc 答案的变体,该变体更可能包含匹配项,并扩大了干草堆。一、设置代码:

>>> from random import choice, randint
>>> from string import ascii_uppercase as uppercase
>>> # 5000 haystacks, each 1000-5000 characters long
>>> listStrings = [''.join([choice(uppercase) for i in range(randint(1000, 5000))]) for j in range(5000)]
>>> # ~1000 needles (might be slightly less for dups), each 3-12 characters long
>>> listSubstrings = tuple(''.join([choice(uppercase) for i in range(randint(3, 12))]) for j in range(1000))
>>> auto = ahocorasick.Automaton()
>>> for needle in listSubstrings:
...     auto.add_word(needle, needle)
...
>>> auto.make_automaton()

现在来实际测试它(使用 ipython %timeit 魔术进行微基准测试):

>>> sum(needle in haystack for haystack in listStrings for needle in listSubstrings)
80279  # Will differ depending on random seed
>>> sum(len(set(map(itemgetter(1), auto.iter(haystack)))) for haystack in listStrings)
80279  # Same behavior after uniquifying results
>>> %timeit -r5 sum(needle in haystack for haystack in listStrings for needle in listSubstrings)
1 loops, best of 5: 9.79 s per loop
>>> %timeit -r5 sum(len(set(map(itemgetter(1), auto.iter(haystack)))) for haystack in listStrings)
1 loops, best of 5: 460 ms per loop

因此,对于在 5000 个中等大小的字符串中的每一个中检查 ~1000 个较小的字符串,pyahocorasick 在我的机器上以 ~21 倍的倍数击败了个人成员资格测试。随着listSubstrings 的大小增加,它也可以很好地扩展;当我以同样的方式初始化它,但使用 10,000 个小字符串而不是 1000 个时,所需的总时间从约 460 毫秒增加到约 852 毫秒,这是执行 10 倍逻辑搜索的 1.85 倍时间乘数。

为了记录,在这种情况下构建自动机的时间是微不足道的。您预先支付一次,而不是每个 haystack 一次,测试显示约 1000 个字符串自动机需要约 1.4 毫秒的时间来构建并占用约 277 KB 的内存(超出字符串本身);约 10000 个字符串自动机花费了约 21 毫秒的时间来构建,并占用了约 2.45 MB 的内存。

【讨论】:

当我尝试使用 pyahocorasick 模块时,我遇到了一些困难来实现我想要做的事情。你觉得 ngram 是个不错的选择吗? (或者至少比在两个 for 循环中扫描列表更好?)(我的数据量与 OP 不同,也没有完全相同的搜索需求,但我使用ngram python 模块获得了不错的结果) 如果输入字符串在文件中; grep -fF 可以使用。另一个 Aho-Corasick Python 实现是 noaho 包,example @shadowRanger 是的,但 pyahocorasick 只找到字符串的前缀,如果子字符串介于字符串中的 2 个单词之间,则不会匹配它。还是我错了? @RetroCode:如果您使用的是matchlongest_prefix 方法,那么是的,那会发生。但是,如果您要查找所有匹配项,则将使用iter(或find_all)方法,该方法一次扫描整个干草堆”以查找所有“针”(每次找到时返回) . @mgc:对不起,我之前没有注意到你的评论。我添加了使用pyahocorasick 的示例代码。如果这不包括它,你需要更明确地说明你的目标。【参考方案5】:

您可以通过将 listString 加入一个长字符串来显着加快内部循环(或者从文件中读取字符串而不将其拆分为换行符)。

with open('./testStrings.txt') as f:
    longString = f.read()               # string with seqs separated by \n

with open('./testSubstrings.txt') as f:
    listSubstrings = list(f)

def search(longString, listSubstrings):
    for n, substring in enumerate(listSubstrings):
        offset = longString.find(substring)
        while offset >= 0:
            yield (substring, offset)
            offset = longString.find(substring, offset + 1)

matches = list(search(longString, listSubstrings))

偏移量可以映射到字符串索引。

from bisect import bisect_left
breaks = [n for n,c in enumerate(longString) if c=='\n']

for substring, offset in matches:
    stringindex = bisect_left(breaks, offset)

我的测试显示,与嵌套 for 循环相比,速度提高了 7 倍(11 秒对 77 秒)。

【讨论】:

注意:如果内存紧张,一次将所有listString 加载到内存中(而不是一次加载一个字符串)可能会出现问题。如果字符串是纯 ASCII,并且通常位于 OP 给定范围的较长端,则存储 60K 字符串(每个字符串包含 30K 个字符)的成本约为 1.8 GB。如果字符串可能是非 ASCII,那么即使是单个非 BMP 字符的该长度的字符串也将使用接近 7.2 GB。即使内存不是问题,扫描时您也永远不会从处理器缓存中受益;对于各种单独的和组合的字符串大小,缩放的行为可能不直观。

以上是关于Python:在字符串列表中最佳搜索子字符串的主要内容,如果未能解决你的问题,请参考以下文章

Gcloud 日志查看器 - 搜索子字符串过滤器不起作用

sql MSSQL搜索子字符串

Swift忽略大小写搜索子字符串的三种方法及性能对比

Swift忽略大小写搜索子字符串的三种方法及性能对比

搜索子字符串的数组元素并返回索引

如何搜索子字符串(WHERE列LIKE'%foo%')