在递归方法中如何知道我的所有线程何时完成执行?

Posted

技术标签:

【中文标题】在递归方法中如何知道我的所有线程何时完成执行?【英文标题】:How to know when all my threads have finished executing when in recursive method? 【发布时间】:2018-06-25 11:02:59 【问题描述】:

我一直在做一个网页抓取项目。

我有两个问题,一个是以百分比形式显示处理的 url 数量,但更大的问题是我无法弄清楚我如何知道我创建的所有线程何时完全完成。

注意:我知道一旦完成并行 foreach 就会移动,但这是在 recursive 方法中。

我的代码如下:

    public async Task Scrape(string url)
    
        var page = string.Empty;

        try
        
            page = await _service.Get(url);

            if (page != string.Empty)
            
                if (regex.IsMatch(page))
                

                    Parallel.For(0, regex.Matches(page).Count,
                        index =>
                        
                            try
                            
                                if (regex.Matches(page)[index].Groups[1].Value.StartsWith("/"))
                                
                                    var match = regex.Matches(page)[index].Groups[1].Value.ToLower();
                                    if (!links.Contains(BaseUrl + match) && !Visitedlinks.Contains(BaseUrl + match))
                                    
                                        Uri ValidUri = WebPageValidator.GetUrl(match);
                                        if (ValidUri != null && HostUrls.Contains(ValidUri.Host))
                                            links.Enqueue(match.Replace(".html", ""));
                                        else
                                            links.Enqueue(BaseUrl + match.Replace(".html", ""));

                                    
                                
                            
                            catch (Exception e)
                            
                                log.Error("Error occured: " + e.Message);
                                Console.WriteLine("Error occured, check log for further details."); ;
                            
                        );

                WebPageInternalHandler.SavePage(page, url);
                var context = CustomSynchronizationContext.GetSynchronizationContext();

                Parallel.ForEach(links, new ParallelOptions  MaxDegreeOfParallelism = 25 ,
                    webpage =>
                    
                        try
                        
                            if (WebPageValidator.ValidUrl(webpage))
                            
                                string linkToProcess = webpage;
                                if (links.TryDequeue(out linkToProcess) && !Visitedlinks.Contains(linkToProcess))
                                

                                        ShowPercentProgress();
                                        Thread.Sleep(15);
                                        Visitedlinks.Enqueue(linkToProcess);
                                        Task d = Scrape(linkToProcess);
                                        Console.Clear();


                                
                            
                        
                        catch (Exception e)
                        
                            log.Error("Error occured: " + e.Message);
                            Console.WriteLine("Error occured, check log for further details.");
                        
                    );

                Console.WriteLine("parallel finished");
            
        

        catch (Exception e)
        
            log.Error("Error occured: " + e.Message);
            Console.WriteLine("Error occured, check log for further details.");
        

    

注意 Scrape 被多次调用(递归)

像这样调用方法:

    public Task ExecuteScrape()
    
        var context = CustomSynchronizationContext.GetSynchronizationContext();
        Scrape(BaseUrl).ContinueWith(x => 

            Visitedlinks.Enqueue(BaseUrl);
        , context).Wait();

        return null;
    

依次被这样调用:

    static void Main(string[] args)
    
        RunScrapper();
        Console.ReadLine();
    

    public static void RunScrapper()
    
        try
        

            _scrapper.ExecuteScrape();

        
        catch (Exception e)
        
            Console.WriteLine(e);
            throw;
        
    

我的结果:

我该如何解决这个问题?

【问题讨论】:

实际上不可能显示有意义的完成百分比。你可以有 5 个网址。您已经处理了 4 个。您已完成 80%。然后最后一个 url 返回 400 多个 url。所以你不是真的在 80%,你暂时是 1%。 【参考方案1】:

(回答有关网页抓取的问题对我来说是否合乎道德?)

不要递归调用Scrape。将要抓取的 url 列表放入 ConcurrentQueue 并开始处理该队列。由于抓取页面的过程会返回更多 url,只需将它们添加到同一个队列中即可。

我也不会只使用一个字符串。我建议创建一个类似

的类
public class UrlToScrape //because naming things is hard
        
    public string Url  get; set; 
    public int Depth  get; set; 

无论您如何执行此操作,它都是递归的,因此您必须以某种方式跟踪您的深度。网站可能会故意生成将您送入无限递归的 URL。 (如果他们这样做了,那么他们不希望您抓取他们的网站。有人希望人们抓取他们的网站吗?)

当您的队列为空时,这并不意味着您已完成。队列可能是空的,但抓取最后一个出队的 url 的过程仍然可以将更多项目添加回该队列,因此您需要一种方法来解决这个问题。

您可以使用线程安全计数器(int 使用 Interlocked.Increment/Decrement),当您开始处理 url 时递增,完成时递减。当队列为空并且进程内 url 的计数为零时,您就完成了。

这是一个非常粗略的模型来说明这个概念,而不是我所说的精炼解决方案。比如你仍然需要考虑异常处理,我不知道结果去了哪里等等。

public class UrlScraper

    private readonly ConcurrentQueue<UrlToScrape> _queue = new ConcurrentQueue<UrlToScrape>();
    private int _inProcessUrlCounter;
    private readonly List<string> _processedUrls = new List<string>();

    public UrlScraper(IEnumerable<string> urls)
    
        foreach (var url in urls)
        
            _queue.Enqueue(new UrlToScrape Url = url, Depth = 1);
        
    

    public void ScrapeUrls()
    
        while (_queue.TryDequeue(out var dequeuedUrl) || _inProcessUrlCounter > 0)
        
            if (dequeuedUrl != null)
            
                // Make sure you don't go more levels deep than you want to.
                if (dequeuedUrl.Depth > 5) continue;
                if (_processedUrls.Contains(dequeuedUrl.Url)) continue;

                _processedUrls.Add(dequeuedUrl.Url);
                Interlocked.Increment(ref _inProcessUrlCounter);
                var url = dequeuedUrl;
                Task.Run(() => ProcessUrl(url));
            
        
    

    private void ProcessUrl(UrlToScrape url)
    
        try
        
            // As the process discovers more urls to scrape,
            // pretend that this is one of those new urls.
            var someNewUrl = "http://discovered";
            _queue.Enqueue(new UrlToScrape  Url = someNewUrl, Depth = url.Depth + 1 );
        
        catch (Exception ex)
        
            // whatever you want to do with this
        
        finally
        
            Interlocked.Decrement(ref _inProcessUrlCounter);
        
    

如果我真的这样做,ProcessUrl 方法将是它自己的类,它会采用 HTML,而不是 URL。在这种形式下,很难进行单元测试。如果它在一个单独的类中,那么您可以传入 HTML,验证它是否在某处输出结果,并调用一个方法来将找到的新 URL 加入队列。

将队列维护为数据库表也不是一个坏主意。否则,如果你正在处理一堆 url 而你不得不停下来,你就会重新开始。

【讨论】:

我正在尝试这样做。就深度而言,我检查它总是有一个 baseUrl 的 url,这样我就不会徘徊到无穷大。我不是 100% 确定这是否可以作为多个进程工作?就像我有一个平行的foreach。我会进行实验并回到你身边。仅供参考,如果您问我,您的“不那么“精致的解决方案”比我的更精致。谢谢! 即使 url 与给定的基本 url 匹配,站点也可能导致无限递归。他们只是添加随机 url,这些 url 都返回相同的页面,其中充满了更多随机 url。他们不需要创建一个新页面来拥有一个新的 url,因此他们可以拥有无​​限数量的页面。 是的,但我正在删减的那个有特定数量的页面。获得正确的数字是一个挑战。我假设我的解决方案具有正确的数字,但我描述的其他问题在路上。这里的挑战/任务是获取特定 baseurl 下的所有页面。仍在尝试应用这个,一旦我完成就会接受。希望如果我不能让它发挥作用,我可以保持它的存在,以备将来可能的指导。 如果可以的话,我会把这个答案投票给无穷大。它不仅可以帮助我在所有完成后获得最终输出,而且我还设法以一种很好的方式展示了百分比。我将用我自己的答案更新它,以通过有用的链接展示我的最终解决方案,帮助我更好地理解你的答案。很少能得到这样详细而有帮助的答案,非常感谢! 太好了,我很高兴听到这个消息!【参考方案2】:

您不能将所有任务Task d 添加到您通过所有递归调用(通过方法参数)线程化的某种类型的并发集合中,然后简单地调用Task.WhenAll(tasks).Wait()

您需要一个中间方法(使其更简洁)来启动基本的Scrape 调用并传入空任务集合。当基本调用返回时,您手头有所有任务,您只需等待它们完成即可。

public async Task Scrape (
    string url) 
    var tasks = new ConcurrentQueue<Task>();

    //call your implementation but
    //change it so that you add
    //all launched tasks d to tasks
    Scrape(url, tasks);
    
    //1st option: Wait().
    //This will block caller
    //until all tasks finish 
    Task.WhenAll(tasks).Wait(); 

    
    //or 2nd option: await 
    //this won't block and will return to caller.
    //Once all tasks are finished method
    //will resume in WriteLine
    await Task.WhenAll(tasks);
    Console.WriteLine("Finished!"); 

简单规则:如果您想知道某件事何时结束,第一步是跟踪。在您当前的实现中,您实际上是在触发并忘记所有已启动的任务...

【讨论】:

老实说,我在并发列表和任务方面的经验并不是我会吹嘘的。我可以举一个更详细的例子来打扰你吗,你可能会在这里做一些事情 作为一个编辑,我不知道为什么这被否决了?

以上是关于在递归方法中如何知道我的所有线程何时完成执行?的主要内容,如果未能解决你的问题,请参考以下文章

如何确定我的所有线程何时完成执行?

递归调用后的代码递归

如何使“主线程”等待“子线程”执行结束后再继续执行

具有多个列表的递归排列

c# Task.WhenAll(tasks) 和 SemaphoreSlim - 如何知道所有任务何时已完全完成

Scala:如何在循环中合并数据帧