测试在内部使用 SemaphoreSlim 以实现并行化的异步方法

Posted

技术标签:

【中文标题】测试在内部使用 SemaphoreSlim 以实现并行化的异步方法【英文标题】:Testing an async method that uses SemaphoneSlim internally to achieve parallelisation 【发布时间】:2021-09-20 17:13:05 【问题描述】:

我为IEnumerable<Uri> 编写了一个扩展方法,以允许下载URI 中指定的资源。这是简化的代码:

public static async Task DownloadInParallel(this IEnumerable<Uri> values, HttpClient httpClient, Func<Uri, int, Stream, Task> successCallback, int maxDownloadsInParallel, CancellationToken cancellationToken)

    var throttler = new SemaphoreSlim(initialCount: maxDownloadsInParallel);

    var tasks = values.Select(async (x, i) =>
    
        await throttler.WaitAsync(cancellationToken).ConfigureAwait(false);

        try
        
            using (var response = await httpClient.GetAsync(x, HttpCompletionOption.ResponseHeadersRead, cancellationToken).ConfigureAwait(false))
            using (var stream = await response.Content.ReadAsStreamAsync().ConfigureAwait(false))
            
                await successCallback(x, i, stream).ConfigureAwait(false);
            
        
        finally
        
            throttler.Release();
        
    );

    await Task.WhenAll(tasks).ConfigureAwait(false);

使用模拟HttpMessageHandler 编写单元测试来断言成功回调的执行是直截了当的,但在我看来还不够好,因为扩展方法提供的基本功能不仅仅是列表资源被下载,但能够并行执行这些下载,达到指定的限制。但是,这很难测试,因为无法看到线程何时进入信号量。这意味着我不能做与并行性相关的断言,这有两个方面:下载确实是并行执行的,而不是顺序执行的,以及并行度达到但没有超过指定的限制。

作为一种解决方法,我想将事件添加到包含类,当信号量被输入和释放时将被调用。通过预处理器指令使用条件编译并将它们置于内部并使用InternalsVisibleToAttribute,这些不会在发布版本中公开。

#if TEST
internal static event EventHandler SemaphoreEnter;
internal static event EventHandler SemaphoreRelease;
#endif

public static async Task DownloadInParallel(this IEnumerable<Uri> values, HttpClient httpClient, Func<Uri, int, Stream, Task> successCallback, int maxDownloadsInParallel, CancellationToken cancellationToken)

    var throttler = new SemaphoreSlim(initialCount: maxDownloadsInParallel);

    var tasks = values.Select(async (x, i) =>
    
        await throttler.WaitAsync(cancellationToken).ConfigureAwait(false);
#if TEST
        SemaphoreEnter?.Invoke(null, EventArgs.Empty);
#endif
        try
        
            ...
        
        finally
        
            throttler.Release();
#if TEST
            SemaphoreRelease?.Invoke(null, EventArgs.Empty);
#endif
        
    );

    await Task.WhenAll(tasks).ConfigureAwait(false);

这允许我构建一系列被调用的事件,以便我可以进行断言。虽然它有效,但感觉就像一个 hacky 解决方案。这合理吗?我可以采用哪些其他方法来测试此核心功能?还有其他更适合的线程构造吗?

【问题讨论】:

是否可以将DownloadInParallel 拆分为两种方法,一种仅包含并行化功能(类似于Parallel.ForEachAsync),另一种建立在第一种之上并添加下载功能? 你应该把你的测试代码放在模拟里面。 所以我有两种方法: 1. 如果您有 VS 企业版,请使用 Microsoft Fakes(现在与 .net 核心兼容),然后从 API 中填充内容以进行断言在通话中。 2. 在没有假货的情况下,我会设置我的 UT 以在随机空闲端口上启动本地网络服务器(例如 Kestrel),并让它响应带有 id 的 URI,等待 300 毫秒然后返回一些东西。收到请求时,将捕获时间戳。然后,我将针对我的本地服务器的 URI 运行您的代码,并断言在 300 毫秒内仅收到 X,并且总数匹配 @TheodorZoulias,这只会将问题转移到第二种方法,不是吗?信号量没有暴露,所以它的功能总是对外隐藏的。 @PauloMorgado,我不明白你的意思。我有一个模拟 HttpMessageHandler 我正在传递给 HttpClient 的构造函数。我不知道有什么钩子可以用来确定下载的开始。 【参考方案1】:

我非常专注于让这个与线程特定组件一起工作,我忽略了简单的答案。

我的测试结果是:

HttpHandlerMock.Protected()
               .Setup<Task<HttpResponseMessage>>(...)
               .ReturnsAsync(...);

采纳 Paolo 的评论并通过添加 Callback 方法将代码更改为以下代码,这使我可以对 Events 集合执行所需的断言。

HttpHandlerMock.Protected()
               .Setup<Task<HttpResponseMessage>>(...)
               .Callback(() => Events.Add("start"))
               .ReturnsAsync(...);

【讨论】:

以上是关于测试在内部使用 SemaphoreSlim 以实现并行化的异步方法的主要内容,如果未能解决你的问题,请参考以下文章

在内部测试中使用测试数据库和 api 发布移动应用程序是不是合适?

第 9 个线程上的 SemaphoreSlim 死锁

SignalR 如何在内部工作?

在内部服务器上设置Shiny app

使用 postgresql 将第三个表与其他两个表链接以在内部具有动态填充数据

为啥 QVector 的迭代器使用前缀增加而后缀在内部减少?