判断一个数组是不是至少有一个重复项的最快算法
Posted
技术标签:
【中文标题】判断一个数组是不是至少有一个重复项的最快算法【英文标题】:Fastest algorithm to figure out if an array has at least one duplicate判断一个数组是否至少有一个重复项的最快算法 【发布时间】:2016-10-14 13:44:34 【问题描述】:我这里有一个非常特殊的案例。我有一个包含数百万个条目的文件,想知道是否存在至少一个重复项。这里的语言不是很重要,但 C 似乎是速度的合理选择。现在,我想知道对此采取什么样的方法?速度是这里的主要目标。自然,一旦发现一个重复,我们就想停止查找,这很清楚,但是当数据进来时,我不知道它是如何排序的。我只知道这是一个字符串文件,由换行符分隔。现在请记住,我只想知道是否存在重复项。现在,我发现了很多关于在数组中查找所有重复项的 SO 问题,但大多数都是简单而全面的方式,而不是最快的方式。
因此,我想知道:找出数组是否至少包含一个重复项的最快方法是什么?到目前为止,我能在 SO 上找到的最接近的是:Finding out the duplicate element in an array。选择的语言并不重要,但毕竟是编程,多线程是可能的(我只是不确定这是否可行)。
最后,字符串的格式为 XXXNNN(3 个字符和 3 个整数)。
请注意,这不是严格理论上的。 将在机器上进行测试(Intel i7 with 8GB RAM),所以我必须考虑进行字符串比较等的时间。这就是为什么我也想知道它是否可以faster 将字符串一分为二,首先比较整数部分,因为int比较会更快,然后是字符串部分?当然,这也需要我拆分字符串并将后半部分转换为 int,这可能会更慢...
【问题讨论】:
如果数据没有排序,你将不得不从头到尾搜索并比较每个项目,没有办法绕过它。根据您将要使用多少数据,可能值得也可能不值得先对其进行排序。 您无需排序即可查找重复项。您可以将每个字符串插入哈希表并检查冲突中的重复项。哈希查找和插入的预期复杂度为 O(1),因此您可以拥有一个以预期 O(N) 运行的重复检测算法。事实上,如果这种方法没有出现在其他问题中,我会感到非常惊讶。 您可以在一次传递中为每个字符串分配一个介于 0 和 255 之间的整数值(一种签名,如校验和 mod 256)并创建一个包含 256 个整数列表的数组并放置在这些条目的前面,行号具有相同的签名。这有助于对可能相同的行进行分组。 是的,抱歉,3 位数,以 10 为底。@chux 600 万,已知 嗯,如果 26 个不同的字符和 10 个不同的数字组成 26*26*26*10*18*10 组合,为什么不使用 17,576,000 位表(2,197,000 字节)? 【参考方案1】:最后,字符串的格式为 XXXNNN(3 个字符和 3 个整数)。
了解您的关键领域对于此类问题至关重要,因此这使我们能够大大简化解决方案(以及此答案)。
如果 X ∈ A..Z 和 N ∈ 0..9,这给出了 263 * 103 = 17,576,000 个可能的值......一个位集(本质上一个简单、完美且没有误报的 Bloom 过滤器)需要大约 2Mb。
给你:一个生成所有可能的 1700 万个键的 python 脚本:
import itertools
from string import ascii_uppercase
for prefix in itertools.product(ascii_uppercase, repeat=3):
for numeric in range(1000):
print "%s%03d" % (''.join(prefix), numeric)
还有一个简单的 C 位集过滤器:
#include <limits.h>
/* convert number of bits into number of bytes */
int filterByteSize(int max)
return (max + CHAR_BIT - 1) / CHAR_BIT;
/* set bit #value in the filter, returning non-zero if it was already set */
int filterTestAndSet(unsigned char *filter, int value)
int byteIndex = value / CHAR_BIT;
unsigned char mask = 1 << (value % CHAR_BIT);
unsigned char byte = filter[byteIndex];
filter[byteIndex] = byte | mask;
return byte & mask;
出于您的目的,您会这样使用:
#include <stdlib.h>
/* allocate filter suitable for this question */
unsigned char *allocMyFilter()
int maxKey = 26 * 26 * 26 * 10 * 10 * 10;
return calloc(filterByteSize(maxKey), 1);
/* key conversion - yes, it's horrible */
int testAndSetMyKey(unsigned char *filter, char *s)
int alpha = s[0]-'A' + 26*(s[1]-'A' + 26*(s[2]-'A'));
int numeric = s[3]-'0' + 10*(s[4]-'0' + 10*(s[5]-'0'));
int key = numeric + 1000 * alpha;
return filterTestAndSet(filter, key);
#include <stdio.h>
int main()
unsigned char *filter = allocMyFilter();
char key[8]; /* 6 chars + newline + nul */
while (fgets(key, sizeof(key), stdin))
if (testAndSetMyKey(filter, key))
printf("collision: %s\n", key);
return 1;
return 0;
这是线性的,尽管显然还有优化键转换和文件输入的空间。无论如何,示例运行:
useless:~/Source/40044744 $ python filter_test.py > filter_ok.txt
useless:~/Source/40044744 $ time ./filter < filter_ok.txt
real 0m0.474s
user 0m0.436s
sys 0m0.036s
useless:~/Source/40044744 $ cat filter_ok.txt filter_ok.txt > filter_fail.txt
useless:~/Source/40044744 $ time ./filter < filter_fail.txt
collision: AAA000
real 0m0.467s
user 0m0.452s
sys 0m0.016s
诚然,输入文件被缓存在内存中以供这些运行使用。
【讨论】:
很好的线性规划解决方案,使用 2MB 表来避免复杂的数据结构或算法。性能看起来像 n,因为给定的输入,布隆过滤器只需要检查每个元素一次,并具有恒定的时间查找。 是的,当然。我绘制了一个超过 170 万输入行的快速实验,以检查没有发生任何不愉快的事情,并且在测量抖动之前它是笔直的箭头。【参考方案2】:合理的答案是保持算法的复杂度最小。我鼓励您使用 HashTable 来跟踪插入的元素;最终算法复杂度为 O(n),因为理论上在 HashTable 中搜索是 O(1)。在您的情况下,我建议您在读取文件时运行算法。
public static bool ThereAreDuplicates(string[] inputs)
var hashTable = new Hashtable();
foreach (var input in inputs)
if (hashTable[input] != null)
return true;
hashTable.Add(input, string.Empty);
return false;
【讨论】:
hashTable.Add()
是线性时间吗?如果在制作哈希表之前不知道项目的数量,我怀疑它需要不时调整自己的大小。
这是摊销平均情况不变的时间。有些操作将是线性时间,但只要这些操作以反线性频率发生,它就会摊销。而且,一些哈希/键组合会导致大量冲突,并再次从常数降级为线性时间,但平均而言,我们希望这种情况不会经常发生。
同意摊销平均情况常数时间等。注意:OP later commented文件大小一开始就知道。【参考方案3】:
快速但低效的内存解决方案会使用
// Entries are AAA####
char found[(size_t)36*36*36*36*36*36 /* 2,176,782,336 */] = 0 ; // or calloc() this
char buffer[100];
while (fgets(buffer, sizeof buffer, istream))
unsigned long index = strtoul(buffer, NULL, 36);
if (found[index]++)
Dupe_found();
break;
这篇文章的问题在于它想要“最快的算法”,但没有详细说明内存问题及其对速度的相对重要性。所以速度必须为王,以上浪费很少的时间。它确实符合“一发现重复就停止查找”的要求。
【讨论】:
【参考方案4】:根据有多少不同的东西,你有一些选择:
对整个数组进行排序,然后查找重复元素,复杂性O(n log n)
但可以就地完成,因此内存将是O(1)
所有元素的构建集。根据选择的集合实现可以是O(n)
(当它是哈希集时)或O(n log n)
(二叉树),但这样做会花费你一些内存。
【讨论】:
【参考方案5】:找出数组是否至少包含一个重复项的最快方法是使用位图、多个 CPU 和(原子或非原子)“测试和设置位”指令(例如,80x86 上的lock bts
)。
一般的想法是将数组分成“总元素/CPU数量”大小的块,并将每个块分配给不同的CPU。每个 CPU 通过计算一个整数并对与该整数对应的位进行原子“测试和设置位”来处理它的数组。
但是,这种方法的问题在于您正在修改所有 CPU 都在使用的东西(位图)。一个更好的想法是给每个 CPU 一个整数范围(例如,CPU 编号 N 执行从“(min - max) * N / CPUs”到“(min - max) * (N+1) / CPUs”的所有整数)。这意味着所有 CPU 从整个数组中读取,但每个 CPU 只修改它自己的位图私有部分。这避免了一些与缓存一致性协议相关的性能问题(“读取缓存行的所有权”),也避免了对原子指令的需求。
然后下一步是查看如何将“3 个字符和 3 个数字”字符串转换为整数。理想情况下,这可以/应该使用 SIMD 来完成;这将要求数组采用“数组结构”格式(而不是更可能的“结构数组”格式)。另请注意,您可以先将字符串转换为整数(以“每个 CPU 执行字符串的子集”的方式),以避免每个 CPU 都需要转换每个字符串并将更多内容打包到每个缓存行中。
【讨论】:
我不确定我是否看到这将如何与多个 CPU 一起工作......首先,我只是在浪费周期,只是将其拆分并启动进程。现在,您然后声明 CPU N 在给定范围内执行所有整数。但是,这不需要排序的输入数据吗? @Phroggyy:启动线程是“一次”成本。对于巨大的数组,成本是微不足道的(与处理大量条目的成本相比相形见绌)。对于微型阵列,启动线程的成本会很高,但您首先不会关心微型阵列的性能。如果您希望处理从小数组到大数组的所有内容,您可以调整线程数(例如,如果数组很小,则只有 1 个线程,如果数组是中等大小,则为 2 个线程,...最多您拥有的 CPU)。 对@Brendan,但 6M 真的那么大吗?我当前的程序运行时间为 0.36 秒,最慢的代码行是filter[byteIndex] = byte | mask
,这就是为什么我想知道第二个甚至第三个线程是否值得花费
@Phroggyy:它不需要对输入进行排序 - 如果 CPU/线程只处理一定范围内的整数,那么它会按顺序从数组中读取(一个很好的“缓存友好"/pre-fetchable 访问模式) 并简单地跳过该范围之外的任何整数。
正确的问题,所以每个进程都会处理自己的子集【参考方案6】:
由于您有数百万个条目,我认为最好的算法是计数排序。计数排序正是您所要求的:它通过计算每个元素存在的次数来对数组进行排序。因此,您可以编写一个对数组进行计数排序的函数:
void counting_sort(int a[],int n,int max)
int count[max+1]=0,i;
for(i=0;i<n;++i)
count[a[i]]++;
if (count[a[i]]>=2) return 1;
return 0;
您应该首先在哪里找到最大元素(在 O(n) 中)。计数排序的渐近时间复杂度为O(max(n,M))
,其中M 是在数组中找到的最大值。因此,如果 M 的大小顺序为几百万,则您有几百万个条目这将在 O(n) 中起作用(或者对于计数排序更少,但是因为您需要找到 M,所以它是 O(n)) .如果您还知道 M 不可能大于几百万,那么您可以确定这会给出 O(n) 而不仅仅是 O(max(n,M))。
您可以在此处查看计数排序可视化以更好地理解它: https://www.cs.usfca.edu/~galles/visualization/CountingSort.html
请注意,在上面的函数中,我们没有实现精确的计数排序,当我们找到重复时我们会停止,这样效率更高,因为你只想知道是否有重复。
【讨论】:
以上是关于判断一个数组是不是至少有一个重复项的最快算法的主要内容,如果未能解决你的问题,请参考以下文章