如何将异步与 ForEach 一起使用?
Posted
技术标签:
【中文标题】如何将异步与 ForEach 一起使用?【英文标题】:How can I use Async with ForEach? 【发布时间】:2013-09-11 03:15:36 【问题描述】:在使用 ForEach 时可以使用 Async 吗?以下是我正在尝试的代码:
using (DataContext db = new DataLayer.DataContext())
db.Groups.ToList().ForEach(i => async
await GetAdminsFromGroup(i.Gid);
);
我收到错误:
当前上下文中不存在名称“异步”
using语句所包含的方法设置为async。
【问题讨论】:
【参考方案1】:List<T>.ForEach
不能与 async
配合得特别好(LINQ-to-objects 也不行,原因相同)。
在这种情况下,我建议将每个元素投影到一个异步操作中,然后您可以(异步)等待它们全部完成。
using (DataContext db = new DataLayer.DataContext())
var tasks = db.Groups.ToList().Select(i => GetAdminsFromGroupAsync(i.Gid));
var results = await Task.WhenAll(tasks);
与将async
委托给ForEach
相比,这种方法的好处是:
-
错误处理更合适。来自
async void
的异常不能用catch
捕获;这种方法将在await Task.WhenAll
行传播异常,允许自然的异常处理。
您知道在此方法结束时任务已完成,因为它执行await Task.WhenAll
。如果您使用async void
,则无法轻易判断操作何时完成。
此方法具有用于检索结果的自然语法。 GetAdminsFromGroupAsync
听起来像是一个产生结果的操作(管理员),如果这样的操作可以返回它们的结果而不是设置一个值作为副作用,这样的代码会更自然。
【讨论】:
并不是说它会改变任何东西,但List.ForEach()
不是 LINQ 的一部分。
很好的建议@StephenCleary 并感谢您提供的有关async
的所有答案。他们非常有帮助!
@StewartAnderson:任务将同时执行。串行执行没有扩展;只需在循环体中使用await
执行foreach
。
@mare: ForEach
只接受同步委托类型,并且没有采用异步委托类型的重载。所以简短的回答是“没有人写过异步的ForEach
”。更长的答案是您必须假设一些语义;例如,应该一次处理一个项目(如foreach
)还是同时处理(如Select
)?如果一次一个,异步流不是更好的解决方案吗?如果同时,结果应该按原始项目顺序还是按完成顺序?它应该在第一次失败时失败还是等到所有都完成?等等。
@RogerWolf:是的;使用SemaphoreSlim
来限制异步任务。【参考方案2】:
这个小扩展方法应该为您提供异常安全的异步迭代:
public static async Task ForEachAsync<T>(this List<T> list, Func<T, Task> func)
foreach (var value in list)
await func(value);
由于我们将 lambda 的返回类型从 void
更改为 Task
,因此异常将正确传播。这将允许您在实践中编写类似的内容:
await db.Groups.ToList().ForEachAsync(async i =>
await GetAdminsFromGroup(i.Gid);
);
【讨论】:
除了等待 ForEachAsyn(),还可以调用 Wait()。 Lambda 不需要在这里等待。 我会在此处添加对 CancellationToken 的支持,如托德的回答***.com/questions/29787098/…【参考方案3】:从C# 8.0
开始,您可以异步创建和使用流。
private async void button1_Click(object sender, EventArgs e)
IAsyncEnumerable<int> enumerable = GenerateSequence();
await foreach (var i in enumerable)
Debug.WriteLine(i);
public static async IAsyncEnumerable<int> GenerateSequence()
for (int i = 0; i < 20; i++)
await Task.Delay(100);
yield return i;
More
【讨论】:
这样做的好处是除了等待每个元素之外,您现在还等待枚举器的MoveNext
。这在枚举器无法立即获取下一个元素并且必须等待一个可用的情况下很重要。
这缺少解释/示例如何将其应用于 OPs 问题。【参考方案4】:
简单的答案是使用foreach
关键字代替List()
的ForEach()
方法。
using (DataContext db = new DataLayer.DataContext())
foreach(var i in db.Groups)
await GetAdminsFromGroup(i.Gid);
【讨论】:
为什么这工作而不是使用List()
的ForEach()
方法?
签名是public void ForEach (Action<T> action)
@taylorswiftfan。为了让它工作,它需要是ForEach(Task<T> action)
。由于我们没有通过普通的 ol' for 循环传递委托,因此我们可以等待循环内的每个调用。不过,这些天有更好的方法。 ***.com/a/58112039/3198973
@RubberDuck 每当您这样说详细说明处理此问题的现代方式的答案。 那么您是否对 Entity Framework Core 或其他内容感兴趣?
@PeterCsala 我不知道你在说什么。我用这个赏金奖励一个现有的答案。这个。 ***.com/a/58112039/3198973
我其实觉得这是最简单的方法【参考方案5】:
这是上述异步 foreach 变体的实际工作版本,具有顺序处理:
public static async Task ForEachAsync<T>(this List<T> enumerable, Action<T> action)
foreach (var item in enumerable)
await Task.Run(() => action(item); ).ConfigureAwait(false);
这里是实现:
public async void SequentialAsync()
var list = new List<Action>();
Action action1 = () =>
//do stuff 1
;
Action action2 = () =>
//do stuff 2
;
list.Add(action1);
list.Add(action2);
await list.ForEachAsync();
主要区别是什么? .ConfigureAwait(false);
保持主线程的上下文,同时异步顺序处理每个任务。
【讨论】:
【参考方案6】:问题在于 async
关键字需要出现在 lambda 之前,而不是在正文之前:
db.Groups.ToList().ForEach(async (i) =>
await GetAdminsFromGroup(i.Gid);
);
【讨论】:
这是对async void
的不必要且微妙的使用。这种方法在异常处理和知道异步操作何时完成方面存在问题。
是的,我发现这不能正确处理异常。
这样,如果发生任何未处理的异常,它将破坏整个流程。【参考方案7】:
添加此扩展方法
public static class ForEachAsyncExtension
public static Task ForEachAsync<T>(this IEnumerable<T> source, int dop, Func<T, Task> body)
return Task.WhenAll(from partition in Partitioner.Create(source).GetPartitions(dop)
select Task.Run(async delegate
using (partition)
while (partition.MoveNext())
await body(partition.Current).ConfigureAwait(false);
));
然后像这样使用:
Task.Run(async () =>
var s3 = new AmazonS3Client(Config.Instance.Aws.Credentials, Config.Instance.Aws.RegionEndpoint);
var buckets = await s3.ListBucketsAsync();
foreach (var s3Bucket in buckets.Buckets)
if (s3Bucket.BucketName.StartsWith("mybucket-"))
log.Information("Bucket => BucketName", s3Bucket.BucketName);
ListObjectsResponse objects;
try
objects = await s3.ListObjectsAsync(s3Bucket.BucketName);
catch
log.Error("Error getting objects. Bucket => BucketName", s3Bucket.BucketName);
continue;
// ForEachAsync (4 is how many tasks you want to run in parallel)
await objects.S3Objects.ForEachAsync(4, async s3Object =>
try
log.Information("Bucket => BucketName => Key", s3Bucket.BucketName, s3Object.Key);
await s3.DeleteObjectAsync(s3Bucket.BucketName, s3Object.Key);
catch
log.Error("Error deleting bucket BucketName object Key", s3Bucket.BucketName, s3Object.Key);
);
try
await s3.DeleteBucketAsync(s3Bucket.BucketName);
catch
log.Error("Error deleting bucket BucketName", s3Bucket.BucketName);
).Wait();
【讨论】:
【参考方案8】:如果您使用的是 EntityFramework.Core,则有 an extension method ForEachAsync。
示例用法如下所示:
using Microsoft.EntityFrameworkCore;
using System.Threading.Tasks;
public class Example
private readonly DbContext _dbContext;
public Example(DbContext dbContext)
_dbContext = dbContext;
public async void LogicMethod()
await _dbContext.Set<dbTable>().ForEachAsync(async x =>
//logic
await AsyncTask(x);
);
public async Task<bool> AsyncTask(object x)
//other logic
return await Task.FromResult<bool>(true);
【讨论】:
【参考方案9】:我想补充一点,有一个内置 ForEach 函数的 Parallel class 可用于此目的。
【讨论】:
Parallel
类is not async-friendly。
@TheodorZoulias 我明白了。我认为它会达到类似的结果 - 尽可能快地用多个线程处理列表。我误解了预期的结果。
Parallel
类非常适合 CPU 密集型工作,但不适用于 I/O 密集型工作。例如you don't really need threads,当您进行网络请求时。【参考方案10】:
这是我创建的用于处理 ForEach
的异步场景的方法。
public static class ParallelExecutor
/// <summary>
/// Executes asynchronously given function on all elements of given enumerable with task count restriction.
/// Executor will continue starting new tasks even if one of the tasks throws. If at least one of the tasks throwed exception then <see cref="AggregateException"/> is throwed at the end of the method run.
/// </summary>
/// <typeparam name="T">Type of elements in enumerable</typeparam>
/// <param name="maxTaskCount">The maximum task count.</param>
/// <param name="enumerable">The enumerable.</param>
/// <param name="asyncFunc">asynchronous function that will be executed on every element of the enumerable. MUST be thread safe.</param>
/// <param name="onException">Acton that will be executed on every exception that would be thrown by asyncFunc. CAN be thread unsafe.</param>
/// <param name="cancellationToken">The cancellation token.</param>
public static async Task ForEachAsync<T>(int maxTaskCount, IEnumerable<T> enumerable, Func<T, Task> asyncFunc, Action<Exception> onException = null, CancellationToken cancellationToken = default)
using var semaphore = new SemaphoreSlim(initialCount: maxTaskCount, maxCount: maxTaskCount);
// This `lockObject` is used only in `catch ` block.
object lockObject = new object();
var exceptions = new List<Exception>();
var tasks = new Task[enumerable.Count()];
int i = 0;
try
foreach (var t in enumerable)
await semaphore.WaitAsync(cancellationToken);
tasks[i++] = Task.Run(
async () =>
try
await asyncFunc(t);
catch (Exception e)
if (onException != null)
lock (lockObject)
onException.Invoke(e);
// This exception will be swallowed here but it will be collected at the end of ForEachAsync method in order to generate AggregateException.
throw;
finally
semaphore.Release();
, cancellationToken);
if (cancellationToken.IsCancellationRequested)
break;
catch (OperationCanceledException e)
exceptions.Add(e);
foreach (var t in tasks)
if (cancellationToken.IsCancellationRequested)
break;
// Exception handling in this case is actually pretty fast.
// https://gist.github.com/shoter/d943500eda37c7d99461ce3dace42141
try
await t;
#pragma warning disable CA1031 // Do not catch general exception types - we want to throw that exception later as aggregate exception. Nothing wrong here.
catch (Exception e)
#pragma warning restore CA1031 // Do not catch general exception types
exceptions.Add(e);
if (exceptions.Any())
throw new AggregateException(exceptions);
【讨论】:
您的实现存在一些问题。 1. 源IEnumerable<T>
被枚举两次,因为Count()
,这是一个禁忌。枚举可能会在每次枚举时访问数据库/文件系统/网络。 2. Task.Factory.StartNew(async
产生一个Task<Task>
,而您只等待外部Task
。该方法可能不会传播所有异常,并且可能在所有任务完成之前返回。 3. 使用cancellationToken.IsCancellationRequested
可能会导致不一致的取消行为。请改用ThrowIfCancellationRequested
。
关于Action<Exception> onException = null
参数,它的用例是什么?是在错误发生时立即记录错误,而不是在ForEachAsync
方法完成后批量记录错误?
你是对的,有例外。现在我使用Task.Run
并没有吞下它们。
@TheodorZoulias 感谢您帮助我改进该设计。做得好!我真的很感激:)
谢谢四位有识之士。下次我要看看 TPL 库。谢谢指导! @TheodorZoulias以上是关于如何将异步与 ForEach 一起使用?的主要内容,如果未能解决你的问题,请参考以下文章