需要一个高效的内存缓存,每秒可以处理 4k 到 7k 的查找或写入

Posted

技术标签:

【中文标题】需要一个高效的内存缓存,每秒可以处理 4k 到 7k 的查找或写入【英文标题】:Need an efficient in-memory cache that can process 4k to 7k lookups or writes per second 【发布时间】:2012-05-20 19:22:11 【问题描述】:

我有一个高效的 C# 应用程序,它在多线程 CPU 上以每秒 5k 到 10k 条记录的速率接收 80 个字节的数据。

我现在需要在内存缓存中设置一个来检测和过滤重复记录,这样我就可以阻止它们在管道中进一步传播。

缓存规范(最大阈值)

80字节数据 10,000 条记录/秒 60 秒缓存 = 密钥数量 = 60,000 (总计 48000000 字节 = 48Mb) 理想的缓存大小 = 5 分钟(或 240Mb) 可接受的运行时缓存大小膨胀 = 1 GB

问题

设置内存缓存、字典、哈希表、数组等的最佳方法是什么,以实现最有效的查找、清除旧缓存数据并防止命中的数据过期。

我查看了ASP.Net Cache、System.Runtime.MemoryCache,但认为我需要更轻量级和自定义的东西来实现正确的吞吐量。我也在寻找System.Collections.Concurrent 作为替代方案和this related whitepaper。

有人对最好的方法有什么建议吗?

【问题讨论】:

与“吞吐量问题”一样:它在很大程度上取决于您的运行时环境(硬件、操作系统等)。我只能强烈建议分析替代选项并查看它们的性能,否则这可能是“过早优化”的情况...... 看看redis 这就是您可能需要“不安全”关键字并直接使用指针的地方。 @frenchie 一旦我使用了指针,我应该对它们使用什么逻辑? @oleksii 我会避免使用 Redis,除非您需要在单独的机器上使用它(这都是网络访问)。京都内阁等更适合嵌入的东西,但前提是你需要持久性。如果没有,那就是你自己的记忆。 【参考方案1】:

记住,不要过早地优化!

可能有一种相当简洁的方法可以做到这一点,而无需求助于非托管代码、指针等。

在我的旧的普通笔记本电脑上进行的快速测试表明,您可以向 HashSet 添加 1,000,000 个条目,同时在大约 100 毫秒内删除 100,000 个条目。然后,您可以在约 60 毫秒内使用相同的 1,000,000 个值重复此操作。这仅用于处理 long - 80 字节的数据结构显然更大,但需要一个简单的基准测试。

我的建议:

将“查找”和“重复检测”实现为 HashSet,这对于插入、删除和查找来说非常快。

将实际缓冲区(接收新事件并使旧事件过期)实现为适当大的循环/环形缓冲区。这将避免内存分配和释放,并且可以在前面添加条目并从后面删除它们。以下是一些有用的链接,其中一个(第二个)描述了缓存中过期项目的算法:

Circular Buffer for .NET

Fast calculation of min, max, and average of incoming numbers

Generic C# RingBuffer

How would you code an efficient Circular Buffer in Java or C#

请注意,如果您希望缓存以元素数量(例如 100,000)而不是事件时间(例如最后 5 分钟)为界,则循环缓冲区会更好。

当从缓冲区中删除项目时(首先从末尾搜索),它们也可以从HashSet 中删除。无需使两个数据结构相同。

在需要之前避免使用多线程!您有一个自然的“串行”工作负载。除非您知道其中一个 CPU 线程无法处理速度,否则请将其保留在单个线程中。这避免了争用、锁定、CPU 缓存未命中和其他多线程问题,这些问题往往会减慢非embarrassingly parallel 的工作负载的速度。我在这里的主要警告是,您可能希望将事件的“接收”转移到处理它们的不同线程。

上述建议是Staged event-driven architecture (SEDA) 背后的主要思想,它被用作高性能和行为稳定的事件驱动系统(例如消息队列)的基础。

上述设计可以简洁地包装,并尝试以最小的复杂性实现所需的原始性能。这只提供了一个不错的基线,现在可以从中提取和衡量效率。

(注意:如果您需要缓存的持久性,请查看Kyoto Cabinet。如果您需要缓存对其他用户可见或分发,请查看Redis。

【讨论】:

知道了。我所有的输入数据都来自许多线程。根据这个建议,我使用 ConcurrentQueue 将数据 FIFO 到一个合并线程。单线程的代码粘贴在下面作为答案。【参考方案2】:

我没有什么可以支持的,但我确实喜欢周末练习:)

要解决清除问题,您可以使用循环缓存,其中最新值覆盖最旧的值(当然,您不会有精确的 n 分钟 缓存),因此您只需要记住上一条记录的偏移量。您可以通过用第一条记录的副本填充缓存来初始化缓存,以防止它匹配只有 0 的记录与缓存的未初始化数据。

然后您可以简单地从第一个字节开始匹配,如果记录不匹配,则跳过该记录的剩余字节并尝试匹配下一个字节,直到缓存结束。

如果记录包含一个标题后跟数据,您可能希望向后匹配以提高查找不匹配数据的速度。

【讨论】:

我开始走这条路,但认为这与做 Dict 是一样的。所以我然后考虑将时间四舍五入到分钟间隔,每分钟都有一个不同的字典。然后我考虑了多个线程和链表来跟踪间隔并使用散列来跟踪数据。然后我的头爆炸了。 也许我可以使用并发集合来跟踪数据(由哈希键控),具有QueryCountLastQuery 属性。然后,为了处理旧数据的清除,我可以拥有一组 ConcurrentCollections(每分钟一个)来跟踪哪些哈希码是潜在的清除候选者。一个独立的清除线程可以读取清除候选集合并懒惰地读取主集合中的对象并查看 LastQuery 是否低于阈值。如果数据陈旧,则清除,如果数据处于活动状态,则不执行任何操作。 最后一条评论是假设多线程是解决这个问题的唯一方法......并且需要合适的线程抽象。 我想到的循环缓存只是一个巨大的内存分配,在非托管内存中或固定内存中。只要散列效率更高,我的答案就可以删除。【参考方案3】:

这是一个使用单线程的示例。该代码使用两个字典来跟踪数据。一个字典用于跟踪每个间隔的记录hashDuplicateTracker,第二个字典用于老化字典的某些值HashesByDate

错误: CheckDataFreshness 有一些与 ElementAt() 相关的错误...我正在解决这个问题。

我应该做一些改进

将 Linq 运算符 ElementAt(x) 替换为其他内容 确保 CheckDataFreshness 的运行频率不超过每个间隔一次

使这个多线程

为 FrequencyOfMatchedHash、DecrementRecordHash 将字典替换为 ConcurrentDictionary, 获取 ConcurrentDictionary 的排序版本或为HashesByDate 使用锁
 public class FrequencyOfMatchedHash : Dictionary<int,int>
 
    public void AddRecordHash(int hashCode)
    
        if (this.ContainsKey(hashCode))
        
            this[hashCode]++;
        
        else
        
            this.Add(hashCode, 1);
        
    
    public void DecrementRecordHash(int hashCode)
    
        if (this.ContainsKey(hashCode))
        
            var val = this[hashCode];
            if (val <= 1)
            
                this.Remove(hashCode);
            
         
    

    public override string ToString()
    
        return this.Count + " records";
    


public class HashDuplicateTracker : Dictionary<int, int >


    internal void AddRecord(int recordHash)
    
        if (this.ContainsKey(recordHash))
        
            this[recordHash]++;
        
        else
        
            this.Add(recordHash, 1);
        
    



public class HashesByDate : SortedDictionary<DateTime, FrequencyOfMatchedHash>

    internal void AddRecord(DateTime dt, int recordHash)
    
        if (this.ContainsKey(dt))
        
            this[dt].AddRecordHash(recordHash);
        
        else
        

            var tmp = new FrequencyOfMatchedHash();
            tmp.AddRecordHash(recordHash);

            var tmp2 = new FrequencyOfMatchedHash();
            tmp2.AddRecordHash(recordHash);
            this.Add(dt, tmp);
        
    

public class DuplicateTracker

    HashDuplicateTracker hashDuplicateTracker = new HashDuplicateTracker();

    // track all the hashes by date
    HashesByDate hashesByDate = new HashesByDate();


    private TimeSpan maxRange;
    private int average;

    public DuplicateTracker(TimeSpan range)
    
        this.maxRange = range;
    

    public void AddRecordHash(DateTime dt, int recordHash)
    
        if (hashesByDate.Count == 0)
        
            hashDuplicateTracker.AddRecord(recordHash);
            hashesByDate.AddRecord(dt, recordHash);

            return;
        
        else
        
            // Cleanup old data
            DateTime maxDate = hashesByDate.ElementAt(hashesByDate.Count - 1).Key;
            DateTime oldestPermittedEntry = maxDate - maxRange;

            if (dt >= oldestPermittedEntry)
                try
                
                    hashDuplicateTracker.AddRecord(recordHash);
                    hashesByDate.AddRecord(dt, recordHash);

                    CheckDataFreshness(oldestPermittedEntry);
                
                catch (ArgumentException e)
                
                    // An entry with the same key already exists.
                    // Increment count/freshness
                    hashesByDate[dt].AddRecordHash(recordHash);
                    hashDuplicateTracker[recordHash]++;
                    CheckDataFreshness(oldestPermittedEntry);
                
        
    


    /// <summary>
    /// This should be called anytime data is added to the collection
    /// 
    /// If threading issues are addressed, a more optimal solution would be to run this on an independent thread.
    /// </summary>
    /// <param name="oldestEntry"></param>
    private void CheckDataFreshness(DateTime oldestEntry)
    
        while (hashesByDate.Count > 0)
        
            DateTime currentDate = hashesByDate.ElementAt(0).Key;

            if (currentDate < oldestEntry)
            
                var hashesToDecrement = hashesByDate.ElementAt(0).Value;

                for (int i = 0; i < hashesToDecrement.Count; i++)
                
                    int currentHash = hashesToDecrement.ElementAt(0).Key;
                    int currentValue = hashesToDecrement.ElementAt(0).Value;

                    // decrement counter for hash
                    int tmpResult = hashDuplicateTracker[currentHash] - currentValue;
                    if (tmpResult == 0)
                    
                        // prevent endless memory growth.
                        // For performance this might be deferred 
                        hashDuplicateTracker.Remove(tmpResult);
                    
                    else
                    
                        hashDuplicateTracker[currentHash] = tmpResult;
                    

                    // remove item
                    continue;
                

                hashesByDate.Remove(currentDate);

            
            else
                break;
        
    

 

【讨论】:

很高兴看到尝试。不过,出于多种原因,我认为这在效率方面是错误的。 1)您希望用于添加和删除对象的数据结构自然是某种形式的 FIFO,因为您总是在一端添加并在另一端删除。它将为此进行优化。 2) 除非您需要一直跟踪重复的数量,否则HashSet 将比字典更有效,尤其是因为它总是只需添加一个操作或检查一个操作。 @yamen - 我正在跟踪重复的数量,因为它是客户规范的一部分。此外,我认为可以使用过期哈希的计数来更新 Average 和 Rate 属性,而无需为每个间隔搜索并重新汇总 Average 和 Rate 属性。我想不出一个自然的 FIFO 解决方案允许这样做,或者...... @yamen 我想我正在使用此代码创建 hoppingWindow msdn.microsoft.com/en-us/library/ff518448.aspx ...这可能是最合适的。 重复总数很简单,HashSetAdd 如果添加则返回 true)。然后,您可以只保留那些重复的,但仍然可以获得更有效的结构。您也可以保持运行average,但仍使用 FIFO 数据结构 - 只需保持运行总和和计数,在添加时递增,在减去时递减。你开始这个问题是为了效率,而目前的方法效率很低。 @yamen 感谢您的建议,我正在尝试实施它。将尝试使用 HashSet

以上是关于需要一个高效的内存缓存,每秒可以处理 4k 到 7k 的查找或写入的主要内容,如果未能解决你的问题,请参考以下文章

性能基本概念

深入理解JAVA虚拟机 高效并发

工作项缓存的全局内存

谁给我解释一下一级缓存和二级缓存啊~

Redis 缓存过期处理与内存淘汰机制

Redis缓存过期处理与内存淘汰机制