C# LanguageExt - 将多个异步调用组合成一个分组调用

Posted

技术标签:

【中文标题】C# LanguageExt - 将多个异步调用组合成一个分组调用【英文标题】:C# LanguageExt - combine multiple async calls into one grouped call 【发布时间】:2020-12-29 19:15:08 【问题描述】:

我有一个从数据存储中异步查找项目的方法;

class MyThing 
Task<Try<MyThing>> GetThing(int thingId) ...

我想从数据存储中查找多个项目,并编写了一个新方法来执行此操作。我还编写了一个辅助方法,它将采用多个 Try&lt;T&gt; 并将它们的结果组合成一个 Try&lt;IEnumerable&lt;T&gt;&gt;


public static class TryExtensions

    Try<IEnumerable<T>> Collapse<T>(this IEnumerable<Try<T>> items)
    
        var failures = items.Fails().ToArray();
        return failures.Any() ?
            Try<IEnumerable<T>>(new AggregateException(failures)) :
            Try(items.Select(i => i.Succ(a => a).Fail(Enumerable.Empty<T>())));
    


async Task<Try<MyThing[]>> GetThings(IEnumerable<string> ids)

    var results = new List<Try<Things>>();

    foreach (var id in ids)
    
        var thing = await GetThing(id);
        results.Add(thing);
    

    return results.Collapse().Map(p => p.ToArray());

另一种方法是这样的;

async Task<Try<MyThing[]>> GetThings(IEnumerable<string> ids)

    var tasks = ids.Select(async id => await GetThing(id)).ToArray();
    await Task.WhenAll(tasks);
    return tasks.Select(t => t.Result).Collapse().Map(p => p.ToArray());

这样做的问题是所有任务都将并行运行,我不想用大量并行请求来冲击我的数据存储。我真正想要的是使用LanguageExt 的一元原则和特性使我的代码功能化。有谁知道如何做到这一点?


更新

感谢@MatthewWatson 的建议,这就是SemaphoreSlim 的样子;

async Task<Try<MyThing[]>> GetThings(IEnumerable<string> ids)

    var mutex = new SemaphoreSlim(1);
    var results = ids.Select(async id =>
    
        await mutex.WaitAsync();
        try  return await GetThing(id); 
        finally  mutex.Release(); 
    ).ToArray();
    await Task.WhenAll(tasks);
    return tasks.Select(t => t.Result).Collapse().Map(Enumerable.ToArray);
    return results.Collapse().Map(p => p.ToArray());

问题是,这仍然不是很单子/功能性,并且最终的代码行数比带有foreach 块的原始代码多。

【问题讨论】:

你可以使用SemaphoreSlim来限制,例如***.com/a/57557324/106159 “更少”代码行似乎是一个相当随意的要求。这样做的目的是什么?可以接受多少行代码? 对不起@TimRutter。我的要求是使我的代码“功能化”,即使用MapMatchBind 等的组合。无论如何,我认为使用 foreach 的版本更“整洁”(无论数量多少行),但这只是我的看法。 为什么您认为(根据我的理解;对函数式编程不太熟悉)您的更新中的代码不起作用?功能性意味着它不会改变状态,单子不只是意味着它需要一个参数吗? 我以某种方式相信我可以通过执行return ids.Select(dosomething).Map(dosomethingelse).Bind(athirdthing); 来实现上述目标,而无需编写任何循环。社区说函数式编程中不需要循环 - qr.ae/pNC6fQ 【参考方案1】:

在“另一种方式”中,你打电话时几乎达到了目标:

var tasks = ids.Select(async id =&gt; await GetThing(id)).ToArray();

除了任务不会按顺序运行,因此您最终会遇到很多查询访问您的数据存储区,这是由.ToArray()Task.WhenAll 引起的。一旦你调用了.ToArray(),它就已经分配并启动了任务,所以如果你可以“容忍”一个foreach来实现顺序任务运行,就像这样:

public static class TaskExtensions

    public static async Task RunSequentially<T>(this IEnumerable<Task<T>> tasks)
    
        foreach (var task in tasks) await task;
    

尽管运行“循环”查询并不是一个很好的做法 一般来说,除非你有一些后台服务和一些 特殊情况,通过将其利用到数据库引擎 WHERE thingId IN (...)一般来说是更好的选择。连你 有大量的thingId,我们可以将其切成小10s,100s ..到 缩小WHERE IN 足迹。

回到我们的RunSequentially,我想要让它更实用,例如:

tasks.ToList().ForEach(async task => await task);

但遗憾的是,这仍然会运行有点“并行”的任务。

所以最终的用法应该是:

async Task<Try<MyThing[]>> GetThings(IEnumerable<string> ids)

    var tasks = ids.Select(id => GetThing(id));// remember don't use .ToArray or ToList...
    await tasks.RunSequentially();
    return tasks.Select(t => t.Result).Collapse().Map(p => p.ToArray());

另一个过分的函数式解决方案是让 Lazy 进入 Queue 递归 !!

取而代之的是GetThing,得到一个懒惰的GetLazyThing,只需包装GetThing即可返回Lazy&lt;Task&lt;Try&lt;MyThing&gt;&gt;&gt;

new Lazy<Task<Try<MyThing>>>(() => GetThing(id))

现在使用几个扩展/功能:

public static async Task RecRunSequentially<T>(this IEnumerable<Lazy<Task<T>>> tasks)

    var queue = tasks.EnqueueAll();
    await RunQueue(queue);


public static Queue<T> EnqueueAll<T>(this IEnumerable<T> list)

    var queue = new Queue<T>();
    list.ToList().ForEach(m => queue.Enqueue(m));
    return queue;


public static async Task RunQueue<T>(Queue<Lazy<Task<T>>> queue)

    if (queue.Count > 0)
    
        var task = queue.Dequeue();
        await task.Value; // this unwraps the Lazy object content
        await RunQueue(queue);
    

最后:

var lazyTasks = ids.Select(id => GetLazyThing(id));
await lazyTasks.RecRunSequentially();
// Now collapse and map as you like

更新

但是,如果您不喜欢 EnqueueAllRunQueue 不是“纯”的事实,我们可以使用相同的 Lazy 技巧采取以下方法

public static async Task AwaitSequentially<T>(this Lazy<Task<T>>[] array, int index = 0)

    if (array == null || index < 0 || index >= array.Length - 1) return;
    await array[index].Value;
    await AwaitSequentially(array, index + 1); // ++index is not pure :)

现在:

var lazyTasks = ids.Select(id => GetLazyThing(id));
await tasks.ToArray().AwaitSequentially();
// Now collapse and map as you like

【讨论】:

这正是我一直在寻找的东西。干得好。

以上是关于C# LanguageExt - 将多个异步调用组合成一个分组调用的主要内容,如果未能解决你的问题,请参考以下文章

如何将多个 monad 绑定在一起?

Xamarin C# - 从其他类调用异步方法

C#异步调用四大方法详解

在 iOS 中等待多个网络异步调用

IAuthenticationFilter 中的 C# Ninject Web API 异步查询导致实体框架中的多个操作错误

C# 同步调用 异步调用 异步回调 多线程的作用