如何检查一个任务是不是完成并开始下一个?

Posted

技术标签:

【中文标题】如何检查一个任务是不是完成并开始下一个?【英文标题】:How to check if a Task is finished and start the next one?如何检查一个任务是否完成并开始下一个? 【发布时间】:2021-04-03 10:40:47 【问题描述】:

我有一个分页的 Rest API 并希望接收所有记录。目前我循环直到同步接收到所有记录。

现在我想改变行为,因为很长一段时间,直到收到所有记录。我想要做的是启动分配给 List 的 10 个分页请求,并在请求完成时启动下一个请求。这样最多 10 个请求并行运行。我当前的代码如下所示:

protected async Task<List<T>> GetAll<T, TListObject>() where TListObject : ApiListModel<T> 
            var limit = this.GetRequestLimit();
            var token = this.GetBearerToken();
            var maxConcurrent = _config.GetValue<int>("Blackduck:MaxConcurrentRequests");

            var list = new List<T>();
            var tasks = new List<Task<TListObject>>();

            //Request first page to get total count
            var resp = await this.GetPage<TListObject>(0, limit);
            list.AddRange(resp.Items);
            var total = resp.TotalCount;

            for (int i = 0; i < maxConcurrent; i++)
            
                tasks.Add(this.GetPage<TListObject>(0, 0)); // TODO: 0, 0 should be replaced with the offset and limit
            

            TListObject result;
            while (list.Count < total || (result = Task.WhenAny<TListObject>(tasks)))
            
                
            

            return list;
        

但现在我暂时无法使用Task.WhenAny&lt;T&gt;() 开始下一次运行,直到收到所有记录。有人知道如何开始下一页吗?

BR

【问题讨论】:

你可以使用Semaphore(Slim) 相关:How to limit the amount of concurrent async I/O operations? 一个新的 API Parallel.ForEachAsync 可能会在下一个主要的 .NET 版本中提供,但现在你必须手动进行限制,或者使用一些第三方图书馆。 【参考方案1】:

这样最多可以并行运行 10 个请求。

要同时运行异步代码,请使用SemaphoreSlimTask.WhenAll

var mutex = new SemaphoreSlim(10);
for (int i = 0; i < maxConcurrent; i++)

  tasks.Add(ThrottledGetPage(offset, limit));


var results = await Task.WhenAll(tasks);

async Task<T> ThrottledGetPage(int offset, int limit)

  await mutex.WaitAsync();
  try  return await this.GetPage<TListObject>(offset, limit); 
  finally  mutex.Release(); 

【讨论】:

【参考方案2】:

首先,创建一个创建任务的generator迭代器方法(返回IEnumerator&lt;T&gt;并使用yield的方法之一),它会让你的生活更轻松。

类似的东西:

IEnumerator<Task<TListObject>> taskGenerator()

    // TODO: add termination condition, `total` is it?
    while (true)
    
        // TODO: 0, 0 should be replaced with the offset and limit
        yield return GetPage<TListObject>(0, 0);
    

那么我们需要一个List&lt;Task&gt; 来处理所有已启动的任务。

var tasks = new List<Task<TListObject>>();

foreach (newTask in taskGenerator())

    // This should add tasks until we have enough
    tasks.Add(newTask);
    if (tasks.Count < maxConcurrent)
    
        continue;
    

    var completedTask = await Task.WhenAny<TListObject>(tasks);
    // Whatever you do with completed task, I don't know
    HandleCompletedTask(completedTask);
    tasks.Remove(completedTask);


while (tasks.Count > 0)

    var completedTask = await Task.WhenAny<TListObject>(tasks);
    // Whatever you do with completed task, I don't know
    HandleCompletedTask(completedTask);
    tasks.Remove(completedTask);

大体思路是:

添加到列表中,直到达到我们想要的并发任务数。 等待其中任何一个完成。 处理已完成的任务,并将其从列表中删除。 重复。

需要注意的是,当我们完成任务时,我们仍然需要处理列表中的所有任务。这就是为什么我们需要另一个循环。


但是,如果多个任务可能在附近完成,最好一起处理......

var tasks = new List<Task<TListObject>>();

foreach (newTask in taskGenerator())

    // This should add tasks until we have enough
    tasks.Add(newTask);
    if (tasks.Count < maxConcurrent)
    
        continue;
    

    await Task.WhenAny<TListObject>(tasks);
    tasks = HandleTasks(tasks);


while (tasks.Count > 0)

    await Task.WhenAny<TListObject>(tasks);
    tasks = HandleTasks(tasks);

这里的方法HandleTasks旨在:

处理已完成的任务(不管你怎么做) 制作一个新列表,列出所有尚未完成的任务。
List<Task<TListObject>> HandleTasks(List<Task<TListObject>> tasks)

    var result = new List<Task<TListObject>>();
    // We will repopulate the list, and handle completed tasks
    foreach (task in tasks)
    
        if (task.RanToCompletion)
        
            // Whatever you do with completed task, I don't know
            HandleCompletedTask(task);
        
        else
        
            result .Add(task);
        
    
    return result;

【讨论】:

【参考方案3】:

也许Parallel.For 也是一个选项,它可以轻松执行多个操作,并让您控制可以同时运行的最大任务量。

class Program

    static void Main(string[] args)
    
        var pageSize = 10;
        var totalItems = 1000;
        var apiLists = new ConcurrentBag<ApiListModel>();
        
        Parallel.For(0, totalItems / pageSize, new ParallelOptions  MaxDegreeOfParallelism = 10 , i =>
        
            apiLists.Add(GetPage(i * pageSize, pageSize));
        );
    

    static ApiListModel GetPage(int offset, int pageSize)
    
        Console.WriteLine($"Getting page with size pageSize and offset offset");
        
        // Get content from API
        return new ApiListModel
        

        ;
    

结果:

Getting page with size 10 and offset 300
Getting page with size 10 and offset 200
Getting page with size 10 and offset 400
Getting page with size 10 and offset 0
Getting page with size 10 and offset 500
Getting page with size 10 and offset 700
Getting page with size 10 and offset 800
Getting page with size 10 and offset 100
Getting page with size 10 and offset 600
Getting page with size 10 and offset 310
Getting page with size 10 and offset 320
Getting page with size 10 and offset 810
Getting page with size 10 and offset 330
Getting page with size 10 and offset 820
Getting page with size 10 and offset 340
Getting page with size 10 and offset 830
Getting page with size 10 and offset 350
Getting page with size 10 and offset 510
Getting page with size 10 and offset 360
Getting page with size 10 and offset 520
Getting page with size 10 and offset 370
Getting page with size 10 and offset 530
Getting page with size 10 and offset 380
Getting page with size 10 and offset 540
Getting page with size 10 and offset 610
Getting page with size 10 and offset 390
Getting page with size 10 and offset 550
Getting page with size 10 and offset 560
Getting page with size 10 and offset 10
Getting page with size 10 and offset 110
Getting page with size 10 and offset 20
Getting page with size 10 and offset 210
Getting page with size 10 and offset 120
Getting page with size 10 and offset 220
Getting page with size 10 and offset 130
Getting page with size 10 and offset 230
Getting page with size 10 and offset 140
Getting page with size 10 and offset 240
Getting page with size 10 and offset 30
Getting page with size 10 and offset 250
Getting page with size 10 and offset 40
Getting page with size 10 and offset 260
Getting page with size 10 and offset 50
Getting page with size 10 and offset 60
Getting page with size 10 and offset 840
Getting page with size 10 and offset 850
Getting page with size 10 and offset 620
Getting page with size 10 and offset 860
Getting page with size 10 and offset 650
Getting page with size 10 and offset 870
Getting page with size 10 and offset 660
Getting page with size 10 and offset 880
Getting page with size 10 and offset 410
Getting page with size 10 and offset 670
Getting page with size 10 and offset 420
Getting page with size 10 and offset 680
Getting page with size 10 and offset 430
Getting page with size 10 and offset 690
Getting page with size 10 and offset 440
Getting page with size 10 and offset 730
Getting page with size 10 and offset 450
Getting page with size 10 and offset 740
Getting page with size 10 and offset 460
Getting page with size 10 and offset 750
Getting page with size 10 and offset 470
Getting page with size 10 and offset 760
Getting page with size 10 and offset 480
Getting page with size 10 and offset 490
Getting page with size 10 and offset 900
Getting page with size 10 and offset 710
Getting page with size 10 and offset 720
Getting page with size 10 and offset 150
Getting page with size 10 and offset 170
Getting page with size 10 and offset 160
Getting page with size 10 and offset 180
Getting page with size 10 and offset 190
Getting page with size 10 and offset 570
Getting page with size 10 and offset 580
Getting page with size 10 and offset 590
Getting page with size 10 and offset 270
Getting page with size 10 and offset 280
Getting page with size 10 and offset 290
Getting page with size 10 and offset 630
Getting page with size 10 and offset 640
Getting page with size 10 and offset 70
Getting page with size 10 and offset 80
Getting page with size 10 and offset 890
Getting page with size 10 and offset 90
Getting page with size 10 and offset 770
Getting page with size 10 and offset 780
Getting page with size 10 and offset 790
Getting page with size 10 and offset 910
Getting page with size 10 and offset 920
Getting page with size 10 and offset 930
Getting page with size 10 and offset 940
Getting page with size 10 and offset 950
Getting page with size 10 and offset 960
Getting page with size 10 and offset 970
Getting page with size 10 and offset 980
Getting page with size 10 and offset 990

apiLists 现在有 100 个 ApiListModel 实例,每个页面一个。

Docs

【讨论】:

以上是关于如何检查一个任务是不是完成并开始下一个?的主要内容,如果未能解决你的问题,请参考以下文章

检查云任务队列是不是为空

如何检查 std::async 任务是不是完成?

检查异步任务状态并获取其结果

如何检查页面重新加载是不是完成

在任务仍在运行时返回结果

防止 Celery Beat 运行相同的任务