停止 MemoryCache 调用的重入

Posted

技术标签:

【中文标题】停止 MemoryCache 调用的重入【英文标题】:Stop Reentrancy on MemoryCache Calls 【发布时间】:2021-04-14 20:21:32 【问题描述】:

应用需要加载数据并缓存一段时间。我希望如果应用程序的多个部分想要同时访问同一个缓存键,缓存应该足够智能,只加载一次数据并将该调用的结果返回给所有调用者。但是,MemoryCache 并没有这样做。如果您并行访问缓存(这通常发生在应用程序中),它会为每次尝试获取缓存值创建一个任务。我认为这段代码会达到预期的结果,但事实并非如此。我希望缓存只运行一个GetDataAsync 任务,等待它完成,然后使用结果来获取其他调用的值。

using Microsoft.Extensions.Caching.Memory;
using System;
using System.Collections.Generic;
using System.Threading.Tasks;

namespace ConsoleApp4

    class Program
    
        private const string Key = "1";
        private static int number = 0;

        static async Task Main(string[] args)
        
            var memoryCache = new MemoryCache(new MemoryCacheOptions  );

            var tasks = new List<Task>();
            tasks.Add(memoryCache.GetOrCreateAsync(Key, (cacheEntry) => GetDataAsync()));
            tasks.Add(memoryCache.GetOrCreateAsync(Key, (cacheEntry) => GetDataAsync()));
            tasks.Add(memoryCache.GetOrCreateAsync(Key, (cacheEntry) => GetDataAsync()));

            await Task.WhenAll(tasks);

            Console.WriteLine($"The cached value was: memoryCache.Get(Key)");
        

        public static async Task<int> GetDataAsync()
        
            //Simulate getting a large chunk of data from the database
            await Task.Delay(3000);
            number++;
            Console.WriteLine(number);
            return number;
        
    

事实并非如此。以上显示了这些结果(不一定按此顺序):

2

1

3

缓存值为:3

它为每个缓存请求创建一个任务,并丢弃从其他两个请求返回的值。

这不必要地花费时间,这让我想知道你是否可以说这个类甚至是线程安全的。 ConcurrentDictionary 具有相同的行为。我测试了它,同样的事情发生了。

有没有办法在任务不运行 3 次的情况下实现所需的行为?

【问题讨论】:

你可能会觉得这很有趣:Async threadsafe Get from MemoryCache 还有this 在缓存中使用Lazy(或者最好是LazyWithNoExceptionCaching - ***.com/a/42567351/34092)条目。 @mjwills:使用Lazy&lt;Task&gt; 很棘手。如果任务失败,就Lazy&lt;T&gt; 而言,仍然会创建惰性值。只有在等待任务并且Lazy&lt;T&gt; 不会等待任务时才会抛出异常。但是,当您的代码等待延迟任务时,会引发异常并且这将继续发生。即使任务失败,Lazy&lt;T&gt; 也不会再次尝试创建值(任务)。 @mjwills 您可能想看看 Stephen Cleary 的 AsyncLazy&lt;T&gt; implementation,以及他们如何使用 RetryOnFailure 标志。 【参考方案1】:

这是一个自定义扩展方法GetOrCreateExclusiveAsync,类似于本机IMemoryCache.GetOrCreateAsync,它可以防止在正常情况下同时调用提供的异步lambda。目的是提高高速缓存机制在重度使用下的效率。仍然有可能发生并发,因此这不能替代线程同步(如果需要)。

此实现还将故障任务从缓存中逐出,以便随后重试失败的异步操作。

using Microsoft.Extensions.Caching.Memory;
using Microsoft.Extensions.Primitives;

/// <summary>
/// Returns an entry from the cache, or creates a new cache entry using the
/// specified asynchronous factory method. Concurrent invocations are prevented,
/// unless the entry is evicted before the completion of the delegate. The errors
/// of failed invocations are not cached.
/// </summary>
public static Task<T> GetOrCreateExclusiveAsync<T>(this IMemoryCache cache, object key,
    Func<Task<T>> factory, MemoryCacheEntryOptions options = null)

    if (!cache.TryGetValue(key, out Task<T> task))
    
        var entry = cache.CreateEntry(key);
        if (options != null) entry.SetOptions(options);
        var cts = new CancellationTokenSource();
        var newTaskTask = new Task<Task<T>>(async () =>
        
            try  return await factory().ConfigureAwait(false); 
            catch  cts.Cancel(); throw; 
            finally  cts.Dispose(); 
        );
        var newTask = newTaskTask.Unwrap();
        entry.ExpirationTokens.Add(new CancellationChangeToken(cts.Token));
        entry.Value = newTask;
        entry.Dispose(); // The Dispose actually inserts the entry in the cache
        if (!cache.TryGetValue(key, out task)) task = newTask;
        if (task == newTask)
            newTaskTask.RunSynchronously(TaskScheduler.Default);
        else
            cts.Dispose();
    
    return task;

使用示例:

var cache = new MemoryCache(new MemoryCacheOptions());
string html = await cache.GetOrCreateExclusiveAsync(url, async () =>

    return await httpClient.GetStringAsync(url);
, new MemoryCacheEntryOptions().SetAbsoluteExpiration(TimeSpan.FromMinutes(10)));

此实现在内部使用嵌套任务 (Task&lt;Task&lt;T&gt;&gt;) 而不是惰性任务 (Lazy&lt;Task&lt;T&gt;&gt;) 作为包装器,因为后面的构造在某些情况下容易出现死锁。 参考:Lazy<Task> with asynchronous initialization,VSTHRD011 Use AsyncLazy。


GitHub 上相关 API 建议:GetOrCreateExclusive() and GetOrCreateExclusiveAsync(): Exclusive versions of GetOrCreate() and GetOrCreateAsync()

【讨论】:

可爱。这看起来像原来的类应该有的东西。 @ChristianFindlay 同意。但是一个微软不能无所不能。他们不断得到requests 来添加东西、修复东西、改进东西和发明东西。 这很有趣!你甚至可能认为他们开发框架是为了赚钱。【参考方案2】:

有不同的解决方案可用,其中最著名的可能是LazyCache:这是一个很棒的库。

另一个您可能会觉得有用的是FusionCache ⚡?,它是我最近发布的:它具有完全相同的功能(尽管实现方式不同)等等。

您正在寻找的功能描述为here,您可以这样使用它:

var result = await fusionCache.GetOrSetAsync(
  Key,
  _ => await GetDataAsync(),
  TimeSpan.FromMinutes(2)
);

您可能还会发现其他一些有趣的功能,例如 fail-safe、advanced timeouts,它们具有后台工厂完成功能并支持可选的分布式 2nd level。

如果你愿意给它一个机会,请告诉我你的想法。

/无耻之塞

【讨论】:

【参考方案3】:

MemoryCache 让您决定如何处理竞争以填充缓存键。在您的情况下,您不希望多个线程竞争填充一个密钥,大概是因为这样做很昂贵。

要像这样协调多个线程的工作,您需要一个锁,但是在异步代码中使用 C# lock 语句会导致线程池饥饿。幸运的是,SemaphoreSlim 提供了一种执行异步锁定的方法,因此只需创建一个包含底层IMemoryCache 的受保护内存缓存即可。

我的第一个解决方案只有一个信号量用于整个缓存,将所有缓存填充任务放在一行中,这不是很聪明,所以这里有一个更精细的解决方案,每个缓存键都有一个信号量。另一种解决方案可能是通过键的哈希选择固定数量的信号量。

sealed class GuardedMemoryCache : IDisposable

    readonly IMemoryCache cache;
    readonly ConcurrentDictionary<object, SemaphoreSlim> semaphores = new();

    public GuardedMemoryCache(IMemoryCache cache) => this.cache = cache;

    public async Task<TItem> GetOrCreateAsync<TItem>(object key, Func<ICacheEntry, Task<TItem>> factory)
    
        var semaphore = GetSemaphore(key);
        await semaphore.WaitAsync();
        try
        
            return await cache.GetOrCreateAsync(key, factory);
        
        finally
        
            semaphore.Release();
            RemoveSemaphore(key);
        
    

    public object Get(object key) => cache.Get(key);

    public void Dispose()
    
        foreach (var semaphore in semaphores.Values)
            semaphore.Release();
    

    SemaphoreSlim GetSemaphore(object key) => semaphores.GetOrAdd(key, _ => new SemaphoreSlim(1));

    void RemoveSemaphore(object key)
    
        if (semaphores.TryRemove(key, out var semaphore))
            semaphore.Dispose();
    

如果多个线程尝试填充相同的缓存键,则实际上只有一个线程会执行此操作。其他线程将改为返回创建的值。

假设您使用依赖注入,您可以让GuardedMemoryCache 实现IMemoryCache,方法是添加更多转发到底层缓存的方法,以修改整个应用程序的缓存行为,只需很少的代码更改。

【讨论】:

对整个内存缓存使用单个SemaphoreSlim 意味着在异步创建单个缓存键时,所有其他键都将被阻止。这是一个非常严重的缺陷,因此我投反对票。您可以通过每个键使用一个SemaphoreSlim 来修复它,但我认为AsyncLazy 解决方案更易于实施。 Martin 您更新的解决方案有一个竞争条件:线程 A 获取并释放特定键的信号量。线程 B 立即获取信号量。线程 A 从字典 (RemoveSemaphore(key)) 中删除信号量。线程 C 为特定键创建一个新的信号量。结果:B 和 C 线程同时为特定键调用 factory。正确地做到这一点是相当棘手的。如果需要,请查看this 问题。 乍一看,我认为这是错误的,但我可以看到它是如何工作的。是的,我认为保留信号量字典可能是一种有效的方法。我们必须这样做,这似乎很疯狂。为什么这不只是内存缓存类的内置内容? 这看起来不错。我很快就会试试这个。 可以说,MemoryCache 的全部意义在于处理竞争条件......

以上是关于停止 MemoryCache 调用的重入的主要内容,如果未能解决你的问题,请参考以下文章

C#中的重入锁

Scala中monad中的重入锁

可重入锁浅谈

第142篇 合约安全-重入锁

多线程——重入锁

WinExec可能会引起消息重入