以半并行方式运行多个任务

Posted

技术标签:

【中文标题】以半并行方式运行多个任务【英文标题】:Run multiple tasks in semiparallel 【发布时间】:2021-02-18 23:02:35 【问题描述】:

我有一个任务列表,其中每个任务都有两个子任务。我只想在上一个任务完成第一个子任务后开始下一个任务。 (第一个子任务对单个 http api 调用了大约 1000 次,如果我称它为“太多”,它似乎会下降,第二个任务是一个相当长时间运行的 cpu 绑定任务)。

我的想法类似于以下示例代码(在实际情况下,任务列表增长到至少 300 个):

class Program

    

    static async Task Main(string[] args)
    
        var test = new TestMultiTask();
        await test.TestListOfTasks();
        Console.WriteLine("Finished");
        Console.ReadLine();
    
 

public class TestMultiTask

    public List<Task> Tasks  get; private set; 
    public async Task TestListOfTasks()
    
        Tasks = new List<Task>();
        Tasks.Add(Task2SubTask(new Progress<MyProgress>(ReportProgress), 0));
        await Task.WhenAll(Tasks);
        Console.WriteLine("Finished TestListOfTasks");
    

    private void ReportProgress(MyProgress obj)
    
        if (obj.ProgressNo < 10)
        
            Tasks.Add(Task2SubTask(new Progress<MyProgress>(ReportProgress), obj.ProgressNo + 1));
        
    

    private async Task Task2SubTask(IProgress<MyProgress> progress, int i)
    
        Console.WriteLine($"Task i, started.");
        await Task.Delay(700); // simulating getting data from web api (abt- 1000 calls)
        Console.WriteLine($"Task i, completed subtask 1."); 
        progress.Report(new MyProgress()  Task1Done = true, ProgressNo = i ); // now the next task can start getting data form web api.
        await Task.Run(() => Task.Delay(1000)); // Process the data collected (in this task) from the web api
        Console.WriteLine($"Task i done."); // return processed data.
    


然而,这至少有一个缺陷 - 只要列表中的第一个任务完成,Program.Main 就会继续,即使列表已扩展。 有谁知道这样做的更聪明的方法? (也许是完全不同的方法?)

编辑: 感谢 cmets.. 现在我尝试注释代码以使其更清晰。

【问题讨论】:

我对这里的任务和子任务有点困惑。所以你需要调用一个 API 1000 次,然后在每次调用后完成一个 CPU 绑定的工作负载? 我从您的问题中了解到的(如果错了告诉我):SubTask1 从某些 API 收集数据,SubTask2 处理该数据。这两个,你有你认为创建“父”任务的好主意的“块”。对吗? 您考虑过TPL Dataflow 库吗?它允许定义由多个块组成的管道,每个块与特定类型的任务相关联并独立配置,然后向管道提供沿途处理的消息(例如 URL)。因此,您可以获得任务并行性和数据并行性。 Here 是一个使用示例。 这听起来更像是一个管道的东西。您需要一个为一个“块”顺序调用 API 的步骤,然后相应的结果应该进入第二步,在那里进行处理。也许 DataFlow 适合你? 谢谢大家。 TPL Dataflow 听起来是一个好方法,@Evk 似乎是一个简单的解决方案,可以解决问题:) 【参考方案1】:

首先,感谢许多 cmets。我尝试了@Evk 的提议,最终得到了以下代码......

public class TestMultiTask2

    public List<Task> Tasks  get; private set; 
    public async Task TestListOfTasks()
    
        var workers = new List<Worker>();
        var crunchingTasks = new List<Task>();
        var n = 10;
        for (int i = 0; i < n; i++)
        
            var w = new Worker(i);
            workers.Add(w);
            await w.GetDataAsync(new Progress<MyProgress>(ReportProgress));
            crunchingTasks.Add(w.CrunchData(new Progress<MyProgress>(ReportProgress)));
        
        await Task.WhenAll(crunchingTasks);
        Console.WriteLine($"Worker 0 data: string.Join(", ", workers[0].Datas)");
        Console.WriteLine($"Worker n-1 data: string.Join(", ", workers[n-1].Datas)");
        Console.WriteLine("Finished TestListOfTasks");
    

    private void ReportProgress(MyProgress obj)
    
        if (obj.Task2Started)
        
            Console.WriteLine($"Task no. obj.TaskNo has started chrunching data: obj.Task2Started");
        
        else if(obj.Task2Done)
        
            Console.WriteLine($"Task no. obj.TaskNo has finished chrunching data: obj.Task2Done");
        
        else if (obj.Task1Done)
        
            Console.WriteLine($"Task no. obj.TaskNo has finished getting data: obj.Task1Done"); 
        
    




public class MyProgress

    public int TaskNo  get; set; 
    public bool Task1Done  get; set; 
    public bool Task2Done  get; internal set; 
    public bool Task2Started  get; internal set; 


public class Worker

    public int ID  get; set; 

    public Worker(int id)
    
        ID = id;
    
    public List<int> Datas  get; set;  = new List<int>()  0, 0, 0 ;
    public async Task GetDataAsync(IProgress<MyProgress> progress)
    
        await Task.Delay(500);
        Datas = new List<int>()  1, 2, 3 ;
        progress.Report(new MyProgress()  TaskNo = ID, Task1Done = true );
    

    public async Task CrunchData(IProgress<MyProgress> progress)
    
        progress.Report(new MyProgress()  TaskNo = ID, Task2Started = true );
        await Task.Run(async () =>  await Task.Delay(5000); Datas.Reverse(); );
        progress.Report(new MyProgress()  TaskNo = ID, Task2Done = true );
    

class Program



    static async Task Main(string[] args)
    
        var test = new TestMultiTask2();
        await test.TestListOfTasks();
        Console.WriteLine("Finished");
        Console.ReadLine();
    

【讨论】:

以上是关于以半并行方式运行多个任务的主要内容,如果未能解决你的问题,请参考以下文章

UWP - 如何启动并行运行的多个任务?

多核并行编程技术

Swift之深入解析如何使用并发系统并行运行多个任务

多进程概念

并发编程 - 总结

如何在基于 C# 的 Windows 服务中处理以不同时间间隔并行运行的多个任务?