linq select中的异步等待

Posted

技术标签:

【中文标题】linq select中的异步等待【英文标题】:Async await in linq select 【发布时间】:2016-05-02 21:17:33 【问题描述】:

我需要修改一个现有程序,它包含以下代码:

var inputs = events.Select(async ev => await ProcessEventAsync(ev))
                   .Select(t => t.Result)
                   .Where(i => i != null)
                   .ToList();

但这对我来说似乎很奇怪,首先在选择中使用asyncawait。根据斯蒂芬克利里的this answer,我应该可以放弃这些。

然后是第二个Select,它选择了结果。这是否意味着任务根本不是异步的并且是同步执行的(付出了这么多的努力白费了),或者任务将异步执行并且当它完成时执行其余的查询?

我是否应该按照another answer by Stephen Cleary 编写上面的代码:

var tasks = await Task.WhenAll(events.Select(ev => ProcessEventAsync(ev)));
var inputs = tasks.Where(result => result != null).ToList();

和这个完全一样吗?

var inputs = (await Task.WhenAll(events.Select(ev => ProcessEventAsync(ev))))
                                       .Where(result => result != null).ToList();

虽然我正在处理这个项目,但我想更改第一个代码示例,但我不太热衷于更改(显然可以工作)异步代码。也许我只是无所顾忌,所有 3 个代码示例都做同样的事情?

ProcessEventsAsync 看起来像这样:

async Task<InputResult> ProcessEventAsync(InputEvent ev) ...

【问题讨论】:

ProceesEventAsync 的返回类型是什么? @tede24 Task&lt;InputResult&gt; InputResult 是一个自定义类。 在我看来,您的版本更容易阅读。但是,您忘记了SelectWhere 之前的任务结果。 而且 InputResult 有一个 Result 属性对吗? 对于懒惰的开发者来说,还有一种方法可以使这段代码异步。只需添加ToList() 以创建所有任务,然后等待events.Select(async ev =&gt; await ProcessEventAsync(ev)).ToList().Select(t =&gt; t.Result)... 这样的结果。与WaitAll() 相比,这对性能有轻微影响,但在大多数情况下可以忽略不计。 【参考方案1】:
var inputs = events.Select(async ev => await ProcessEventAsync(ev))
                   .Select(t => t.Result)
                   .Where(i => i != null)
                   .ToList();

但这对我来说似乎很奇怪,首先在选择中使用 async 和 await。根据 Stephen Cleary 的这个回答,我应该可以放弃这些。

Select 的调用有效。这两行基本相同:

events.Select(async ev => await ProcessEventAsync(ev))
events.Select(ev => ProcessEventAsync(ev))

(关于从ProcessEventAsync 抛出同步异常的方式存在细微差别,但在这段代码的上下文中,这根本不重要。)

然后是选择结果的第二个 Select。这是否意味着该任务根本不是异步的并且是同步执行的(付出了这么多的努力白费了),或者该任务将异步执行并且当它完成时执行其余的查询?

表示查询被阻塞。所以它并不是真正的异步。

分解:

var inputs = events.Select(async ev => await ProcessEventAsync(ev))

将首先为每个事件启动一个异步操作。然后这一行:

                   .Select(t => t.Result)

将等待这些操作一次完成(首先它等待第一个事件的操作,然后是下一个,然后是下一个,等等)。

这是我不关心的部分,因为它会阻塞并且还会在AggregateException 中包装任何异常。

和这个完全一样吗?

var tasks = await Task.WhenAll(events.Select(ev => ProcessEventAsync(ev)));
var inputs = tasks.Where(result => result != null).ToList();

var inputs = (await Task.WhenAll(events.Select(ev => ProcessEventAsync(ev))))
                                       .Where(result => result != null).ToList();

是的,这两个例子是等价的。它们都启动所有异步操作 (events.Select(...)),然后异步等待所有操作以任意顺序完成 (await Task.WhenAll(...)),然后继续其余的工作 (Where...)。

这两个示例都与原始代码不同。原代码是阻塞的,会将异常包装在AggregateException中。

【讨论】:

为清除它干杯!因此,不是将异常包装到 AggregateException 中,而是会在第二个代码中得到多个单独的异常? @AlexanderDerck:不,在旧代码和新代码中,只会引发第一个异常。但是对于Result,它将被包裹在AggregateException中。 我在使用此代码的 ASP.NET MVC 控制器中遇到了死锁。我使用 Task.Run(…) 解决了它。我对此没有好感。但是,它在运行异步 xUnit 测试时恰到好处。怎么回事? @SuperJMN:将stuff.Select(x =&gt; x.Result); 替换为await Task.WhenAll(stuff) @DanielS:它们本质上是一样的。存在一些差异,例如状态机、捕获上下文、同步异常的行为。更多信息blog.stephencleary.com/2016/12/eliding-async-await.html【参考方案2】:

现有代码正在运行,但正在阻塞线程。

.Select(async ev => await ProcessEventAsync(ev))

为每个事件创建一个新任务,但是

.Select(t => t.Result)

阻塞等待每个新任务结束的线程。

另一方面,您的代码产生相同的结果但保持异步。

仅对您的第一个代码发表评论。这一行

var tasks = await Task.WhenAll(events...

将生成单个 Task,因此该变量应以单数命名。

最后你的最后一个代码是一样的,但更简洁。

供参考:Task.Wait/Task.WhenAll

【讨论】:

那么第一个代码块其实是同步执行的? 是的,因为访问 Result 会产生一个 Wait 阻塞线程。另一方面,当产生一个新任务时,您可以等待。 回到这个问题并查看您对tasks 变量名称的评论,您是完全正确的。可怕的选择,他们甚至不是任务,因为他们立即等待。我会保持原样的问题 刚来这个帖子。 @AlexanderDerck - 为什么不编辑答案?在得到这个答案之前,它让我困惑了一段时间。在重要的时候,使用 var 通常会让你达到这一点。【参考方案3】:

我使用了这个代码:

public static async Task<IEnumerable<TResult>> SelectAsync<TSource,TResult>(
    this IEnumerable<TSource> source, Func<TSource, Task<TResult>> method)

  return await Task.WhenAll(source.Select(async s => await method(s)));

像这样:

var result = await sourceEnumerable.SelectAsync(async s=>await someFunction(s,other params));

编辑:

有些人提出了并发问题,例如当您访问数据库时,您不能同时运行两个任务。所以这里有一个更复杂的版本,它也允许特定的并发级别:

public static async Task<IEnumerable<TResult>> SelectAsync<TSource, TResult>(
    this IEnumerable<TSource> source, Func<TSource, Task<TResult>> method,
    int concurrency = int.MaxValue)

    var semaphore = new SemaphoreSlim(concurrency);
    try
    
        return await Task.WhenAll(source.Select(async s =>
        
            try
            
                await semaphore.WaitAsync();
                return await method(s);
            
            finally
            
                semaphore.Release();
            
        ));
     finally
    
        semaphore.Dispose();
    

没有参数,它的行为与上面更简单的版本完全相同。参数为 1 时,它将按顺序执行所有任务:

var result = await sourceEnumerable.SelectAsync(async s=>await someFunction(s,other params),1);

注意:按顺序执行任务并不意味着执行会因错误而停止!

就像使用较大的并发值或未指定参数一样,所有任务都将被执行,如果其中任何一个失败,则生成的 AggregateException 将包含抛出的异常。

如果您想一个接一个地执行任务而第一个任务失败,请尝试另一种解决方案,例如 xhafan (https://***.com/a/64363463/379279) 建议的解决方案

【讨论】:

这只是以更模糊的方式包装现有功能 imo 额外的参数是外部的,取决于我要执行的功能,它们在扩展方法的上下文中是无关的。 这是一个可爱的扩展方法。不知道为什么它被认为“更晦涩” - 它在语义上类似于同步 Select(),所以是一个优雅的插入。 第一个 lambda 中的 asyncawait 是多余的。 SelectAsync 方法可以简单地写成:return await Task.WhenAll(source.Select(method)); 确实@Nathan,为什么有await? - public static Task&lt;TResult[]&gt; SelectAsync&lt;TSource,TResult&gt;(this IEnumerable&lt;TSource&gt; source, Func&lt;TSource, Task&lt;TResult&gt;&gt; method) return Task.WhenAll(source.Select(x =&gt; method(x))); 【参考方案4】:

我更喜欢这个作为扩展方法:

public static async Task<IEnumerable<T>> WhenAll<T>(this IEnumerable<Task<T>> tasks)

    return await Task.WhenAll(tasks);

这样它就可以与方法链接一起使用:

var inputs = await events
  .Select(async ev => await ProcessEventAsync(ev))
  .WhenAll()

【讨论】:

你不应该调用 Wait 方法,当它实际上并没有等待时。它正在创建一个在所有任务都完成时完成的任务。称之为WhenAll,就像它模拟的Task 方法一样。方法为async 也是没有意义的。只需致电WhenAll 即可完成。 @AlexanderDerck 的优点是您可以在方法链中使用它。 @Servy,实际上你不能从扩展方法中删除 async 和 await 。您会收到此错误:``` 无法将类型 'System.Threading.Tasks.Task' 隐式转换为 'System.Threading.Tasks.Task>' ` ` 因为它不知道 Task 与 T 是协变的 @Daryl 因为WhenAll 返回一个评估列表(它不是延迟评估的),所以可以使用Task&lt;T[]&gt; 返回类型来表示它。等待时,它仍然可以使用 Linq,但也表明它不是懒惰的。 进一步扩展@Daryl 的优点,我们可以进一步缩减为:public static Task&lt;T[]&gt; WhenAll&lt;T&gt;(this IEnumerable&lt;Task&lt;T&gt; &gt; tasks) return Task.WhenAll(tasks); 【参考方案5】:

使用 Linq 中可用的当前方法,它看起来很丑:

var tasks = items.Select(
    async item => new
    
        Item = item,
        IsValid = await IsValid(item)
    );
var tuples = await Task.WhenAll(tasks);
var validItems = tuples
    .Where(p => p.IsValid)
    .Select(p => p.Item)
    .ToList();

希望 .NET 的后续版本能够提供更优雅的工具来处理任务集合和集合任务。

【讨论】:

【参考方案6】:

我和@KTCheek 有同样的问题,我需要它按顺序执行。但是我想我会尝试使用 IAsyncEnumerable(在 .NET Core 3 中引入)和 await foreach(在 C# 8 中引入)。这是我想出的:

public static class IEnumerableExtensions 
    public static async IAsyncEnumerable<TResult> SelectAsync<TSource, TResult>(this IEnumerable<TSource> source, Func<TSource, Task<TResult>> selector) 
        foreach (var item in source) 
            yield return await selector(item);
        
    


public static class IAsyncEnumerableExtensions 
    public static async Task<List<TSource>> ToListAsync<TSource>(this IAsyncEnumerable<TSource> source) 
        var list = new List<TSource>();

        await foreach (var item in source) 
            list.Add(item);
        

        return list;
    

这可以通过说来消费:

var inputs = await events.SelectAsync(ev => ProcessEventAsync(ev)).ToListAsync();

更新:或者,您可以添加对System.Linq.Async 的引用,然后您可以说:

var inputs = await events
    .ToAsyncEnumerable()
    .SelectAwait(async ev => await ProcessEventAsync(ev))
    .ToListAsync();

【讨论】:

这两个运算符包含在 System.Linq.Async 包中,名称为 SelectAwaitToListAsync,以及许多其他 LINQ 样式的运算符。 实际上不,您的SelectAsync 运行在IEnumerable&lt;T&gt;s 上。前面提到的SelectAwaitIAsyncEnumerable&lt;T&gt;s 上运行。您需要先通过调用ToAsyncEnumerable 扩展方法对其进行转换。 感谢@TheodorZoulias,我已经用替代解决方案更新了我的答案。【参考方案7】:

我想打电话给Select(...),但要确保它按顺序运行,因为并行运行会导致一些其他并发问题,所以我最终得到了这个。 我不能调用.Result,因为它会阻塞 UI 线程。

public static class TaskExtensions

    public static async Task<IEnumerable<TResult>> SelectInSequenceAsync<TSource, TResult>(this IEnumerable<TSource> source, Func<TSource, Task<TResult>> asyncSelector)
    
        var result = new List<TResult>();
        foreach (var s in source)
        
            result.Add(await asyncSelector(s));
        
        
        return result;
    

用法:

var inputs = events.SelectInSequenceAsync(ev => ProcessEventAsync(ev))
                   .Where(i => i != null)
                   .ToList();

我知道 Task.WhenAll 是我们可以并行运行时要走的路。

【讨论】:

赞成。我更喜欢Task&lt;IList&lt;TResult&gt;&gt;(甚至更好的Task&lt;TResult[]&gt;)的返回类型而不是Task&lt;IEnumerable&lt;TResult&gt;&gt;。后者传达了deferred execution 的概念,在这种情况下不适用。在完成Task 之后,生成的IEnumerable&lt;TResult&gt; 完全实现,因为它基于List&lt;T&gt;【参考方案8】:

“仅仅因为你可以并不意味着你应该。”

您可能可以在 LINQ 表达式中使用 async/await 以使其行为完全符合您的要求,但任何其他阅读您的代码的开发人员仍能理解其行为和意图吗?

(特别是:异步操作应该并行运行还是故意按顺序运行?原始开发人员是否考虑过?)

the question 也清楚地表明了这一点,这似乎是开发人员在不知道其意图的情况下试图理解别人的代码时提出的。为确保不会再次发生这种情况,最好将 LINQ 表达式重写为循环语句(如果可能)。

【讨论】:

以上是关于linq select中的异步等待的主要内容,如果未能解决你的问题,请参考以下文章

Linq中的Select与Select many

linq查询中的异步任务方法问 题在c#中返回

将select linq中的相关对象返回给sql

ApiController Post 中的异步和等待

C# 中的异步/等待和并行 [关闭]

Redux 中的异步/等待