在后台运行长时间运行的并行任务,同时允许小型异步任务更新前台

Posted

技术标签:

【中文标题】在后台运行长时间运行的并行任务,同时允许小型异步任务更新前台【英文标题】:Running a long-running parallel task in the background, while allowing small async tasks to update the foreground 【发布时间】:2021-09-08 20:30:59 【问题描述】:

我有大约 10 000 000 个任务,每个任务需要 1-10 秒才能完成。我在一个强大的服务器上运行这些任务,使用 50 个不同的线程,每个线程选择第一个未完成的任务,运行它,然后重复。

伪代码:

for i = 0 to 50:
    run a new thread:
        while True:
            task = first available task
            if no available tasks: exit thread
            run task

使用此代码,我可以在任何给定数量的线程上并行运行所有任务。

实际上,代码使用了 C# 的 Task.WhenAll,如下所示:

ServicePointManager.DefaultConnectionLimit = threadCount; //Allow more HTTP request simultaneously
var currentIndex = -1;
var threads = new List<Task>(); //List of threads
for (int i = 0; i < threadCount; i++) //Generate the threads

    var wc = CreateWebClient();
    threads.Add(Task.Run(() =>
    
        while (true) //Each thread should loop, picking the first available task, and executing it.
        
            var index = Interlocked.Increment(ref currentIndex);
            if (index >= tasks.Count) break;
            var task = tasks[index];
            RunTask(conn, wc, task, port);
        
    ));


await Task.WhenAll(threads);

这就像我想要的那样工作,但我有一个问题:由于这段代码需要很长时间才能运行,我希望用户看到一些进展。进度显示在彩色位图(代表矩阵)中,生成也需要一些时间(几秒钟)。

因此,我想在后台线程上生成此可视化。但是这个其他后台线程永远不会执行。我的怀疑是它使用与并行代码相同的线程池,因此被排队,并且在并行代码实际完成之前不会被执行。 (这有点太晚了。)

这是我如何生成进度可视化的示例:

private async void Refresh_Button_Clicked(object sender, RoutedEventArgs e)

    var bitmap = await Task.Run(() => // <<< This task is never executed!
    
        //bla, bla, various database calls, and generating a relatively large bitmap
    );

    //Convert the bitmap into a WPF image, and update the GUI
    VisualizationImage = BitmapToImageSource(bitmap);

那么,我怎样才能最好地解决这个问题呢?我可以创建一个Tasks 列表,其中每个Task 代表我的一个任务,然后使用Parallel.Invoke 运行它们,然后选择另一个线程池(我认为)。但随后我必须生成 1000 万个 Task 对象,而不仅仅是 50 个 Task 对象,贯穿我要做的一系列事情。听起来它使用了比必要更多的 RAM。有什么聪明的解决办法吗?

编辑: 正如 Panagiotis Kanavos 在他的一个 cmets 中建议的那样,我尝试用 ActionBlock 替换我的一些循环逻辑,如下所示:

// Create an ActionBlock<int> that performs some work. 
var workerBlock = new ActionBlock<ZoneTask>(
t =>

    var wc = CreateWebClient(); //This probably generates some unnecessary overhead, but that's a problem I can solve later.
    RunTask(conn, wc, t, port);
,
// Specify a maximum degree of parallelism. 
new ExecutionDataflowBlockOptions

    MaxDegreeOfParallelism = threadCount
);

foreach (var t in tasks) //Note: the objects in the tasks array are not Task objects
    workerBlock.Post(t);
workerBlock.Complete();

await workerBlock.Completion;

注意:RunTask 只是使用 WebClient 执行一个 Web 请求,并解析结果。里面没有什么东西会造成死锁。

这似乎与旧的并行代码一样工作,只是它需要一两分钟来执行初始 foreach 循环以发布任务。这种延迟真的值得吗?

尽管如此,我的进度任务似乎仍然被阻止。暂时忽略 Progress 的建议,因为这个简化的代码仍然会遇到同样的问题:

private async void Refresh_Button_Clicked(object sender, RoutedEventArgs e)

    Debug.WriteLine("This happens");
    var bitmap = await Task.Run(() =>
    
        Debug.WriteLine("This does not!");
        //Still doing some work here, so it's not optimized away.
    ;

    VisualizationImage = BitmapToImageSource(bitmap);

因此,只要并行任务正在运行,它看起来仍然不会执行新任务。我什至将“MaxDegreeOfParallelism”从 50 减少到 5(在 24 核服务器上),看看 Peter Ritchie 的建议是否正确,但没有改变。还有其他建议吗?

另一个编辑:

问题似乎是我的所有同时阻塞 I/O 调用使线程池超载。我用 HttpClient 及其异步函数替换了 WebClient,现在一切似乎都运行良好。

感谢大家的好建议!尽管并非所有人都直接解决了问题,但我相信他们都改进了我的代码。 :)

【问题讨论】:

我认为这可能会给你一个起点***.com/questions/548208/… .NET 通过Progress&lt; T&gt;IProgress&lt; T&gt; 已经有了这样的机制 任务不是线程。 TPL 本身负责使用线程来处理任务的有效负载。 RunTask 有什么作用,为什么不直接使用 Task.Run 呢? TPL 具有一定程度的并行性,不会尝试一次做太多事情(通常每个 cpu/核心一次只做一件事情)。如果您没有 50 个 CPU/核心,TPL 可能会限制一次运行的任务数。 谢谢卡纳沃斯和里奇。我已经尝试了您的一些建议,请参阅编辑。 【参考方案1】:

.NET 已经提供了一种机制来报告IProgress< T> 和Progress< T> 实现的进度。

IPProgress 接口允许客户端使用Report(T) 类发布消息,而不必担心线程。该实现确保消息在适当的线程中处理,例如 UI 线程。通过使用简单的IProgress&lt; T&gt; 接口,后台方法与处理消息的人分离。

您可以在Async in 4.5: Enabling Progress and Cancellation in Async APIs 文章中找到更多信息。取消和进度 API 并不特定于 TPL。它们可用于简化取消和报告,即使对于原始线程也是如此。

Progress 在创建它的线程上处理消息。这可以通过在实例化类时传递处理委托或通过订阅事件来完成。从文章中复制:

private async void Start_Button_Click(object sender, RoutedEventArgs e)

    //construct Progress<T>, passing ReportProgress as the Action<T> 
    var progressIndicator = new Progress<int>(ReportProgress);
    //call async method
    int uploads=await UploadPicturesAsync(GenerateTestImages(), progressIndicator);

其中ReportProgress 是一个接受int 参数的方法。它还可以接受一个报告工作完成、消息等的复杂类。

异步方法只需要使用 IProgress.Report,例如:

async Task<int> UploadPicturesAsync(List<Image> imageList, IProgress<int> progress)

        int totalCount = imageList.Count;
        int processCount = await Task.Run<int>(() =>
        
            int tempCount = 0;
            foreach (var image in imageList)
            
                //await the processing and uploading logic here
                int processed = await UploadAndProcessAsync(image);
                if (progress != null)
                
                    progress.Report((tempCount * 100 / totalCount));
                
                tempCount++;
            

            return tempCount;
        );
        return processCount;

这将后台方法与接收和处理进度消息的人分离。

【讨论】:

感谢您的详细解答。但是,您确定这是我的问题的解决方案吗?我可能还不够清楚:这个问题与进展并不严格相关。只要我的后台并行任务正在运行,我就无法从 GUI 启动任何新任务。我怀疑这与线程池有关,但我不太了解。虽然 Progress 看起来确实很有趣,但它看起来仍然需要一个 Task.Run(第 4 行,第二个代码块)才能工作,并且这个 Task.Run 将无法在并行任务运行时执行,使其无法使用。 @ErlenD。不,它没有。如果您遇到阻塞问题,那是因为代码本身阻塞。例如,添加另一个最终调用 BeginInvoke 的 Task.Run 仍将阻塞在 UI 线程上。您需要简化您的代码,删除任何添加到“修复”任务的代码。 @ErlenD。例如,任务不是线程。您不需要并且不应该使用无限循环来“挑选任务”。这就是 TPL 已经在做的事情。它使用自己的线程来获取和处理任务,没有阻塞。你没有将代码发布到RunTask,所以很难说出了什么问题 @ErlenD。顺便说一句,.NET 已经有一个类,您可以向其发布消息并对其进行处理,ActionBlock< TInput>。 谢谢,我试图删除我的无限循环以支持 ActionBlock。它给了我一个较慢的启动(因为发布所有任务需要循环),但似乎仍然有同样的问题。放慢速度值得吗?此外,我删除了进度代码以防止任何潜在的死锁,并添加了一些 Debug.WriteLines 以进一步说明问题。我很确定任务实际上永远不会启动,即使我将并行任务减少到 5 个。有关更多信息,请参阅我在原始帖子中的最新编辑。

以上是关于在后台运行长时间运行的并行任务,同时允许小型异步任务更新前台的主要内容,如果未能解决你的问题,请参考以下文章

Elixir 长时间运行的后台任务未完成,有时会崩溃

长时间运行的后台任务完成后应用程序未挂起

带有异步或长时间运行任务的 UndoManager

长时间运行的任务与线程——性能

使用异步服务器的长时间运行任务

关于ios中后台长时间下载任务的实现与走过的坑