如何在不使用 time.Sleep 的情况下等待所有 goroutines 完成?

Posted

技术标签:

【中文标题】如何在不使用 time.Sleep 的情况下等待所有 goroutines 完成?【英文标题】:How to wait for all goroutines to finish without using time.Sleep? 【发布时间】:2013-08-14 23:47:25 【问题描述】:

此代码选择同一文件夹中的所有 xml 文件,作为调用的可执行文件,并在回调方法中对每个结果进行异步处理(在下面的示例中,仅打印出文件的名称)。

如何避免使用 sleep 方法来防止 main 方法退出?我在处理频道时遇到了问题(我认为这是同步结果所需要的),因此感谢您的帮助!

package main

import (
    "fmt"
    "io/ioutil"
    "path"
    "path/filepath"
    "os"
    "runtime"
    "time"
)

func eachFile(extension string, callback func(file string)) 
    exeDir := filepath.Dir(os.Args[0])
    files, _ := ioutil.ReadDir(exeDir)
    for _, f := range files 
            fileName := f.Name()
            if extension == path.Ext(fileName) 
                go callback(fileName)
            
    



func main() 
    maxProcs := runtime.NumCPU()
    runtime.GOMAXPROCS(maxProcs)

    eachFile(".xml", func(fileName string) 
                // Custom logic goes in here
                fmt.Println(fileName)
            )

    // This is what i want to get rid of
    time.Sleep(100 * time.Millisecond)

【问题讨论】:

【参考方案1】:

这是一个使用 WaitGroup 的解决方案。

首先,定义2个实用方法:

package util

import (
    "sync"
)

var allNodesWaitGroup sync.WaitGroup

func GoNode(f func()) 
    allNodesWaitGroup.Add(1)
    go func() 
        defer allNodesWaitGroup.Done()
        f()
    ()


func WaitForAllNodes() 
    allNodesWaitGroup.Wait()

然后,替换callback的调用:

go callback(fileName)

调用你的实用函数:

util.GoNode(func()  callback(fileName) )

最后一步,将此行添加到 main 的末尾,而不是 sleep。这将确保主线程在程序停止之前等待所有例程完成。

func main() 
  // ...
  util.WaitForAllNodes()

【讨论】:

【参考方案2】:

尽管sync.waitGroup (wg) 是前进的规范方法,但它确实需要您在wg.Wait 之前至少完成一些wg.Add 调用才能完成。这对于像网络爬虫这样的简单事物可能不可行,因为您事先不知道递归调用的数量,并且需要一段时间来检索驱动 wg.Add 调用的数据。毕竟,在知道第一批子页面的大小之前,你需要加载和解析第一页。

我使用渠道编写了一个解决方案,避免在我的解决方案中使用 waitGroup Tour of Go - web crawler 练习。每次启动一个或多个 go-routines 时,您都会将号码发送到 children 频道。每次 goroutine 即将完成时,您都会向 done 频道发送 1。当孩子的总和等于完成的总和时,我们就完成了。

我唯一关心的是results 频道的硬编码大小,但这是(当前)Go 限制。


// recursionController is a data structure with three channels to control our Crawl recursion.
// Tried to use sync.waitGroup in a previous version, but I was unhappy with the mandatory sleep.
// The idea is to have three channels, counting the outstanding calls (children), completed calls 
// (done) and results (results).  Once outstanding calls == completed calls we are done (if you are
// sufficiently careful to signal any new children before closing your current one, as you may be the last one).
//
type recursionController struct 
    results  chan string
    children chan int
    done     chan int


// instead of instantiating one instance, as we did above, use a more idiomatic Go solution
func NewRecursionController() recursionController 
    // we buffer results to 1000, so we cannot crawl more pages than that.  
    return recursionControllermake(chan string, 1000), make(chan int), make(chan int)


// recursionController.Add: convenience function to add children to controller (similar to waitGroup)
func (rc recursionController) Add(children int) 
    rc.children <- children


// recursionController.Done: convenience function to remove a child from controller (similar to waitGroup)
func (rc recursionController) Done() 
    rc.done <- 1


// recursionController.Wait will wait until all children are done
func (rc recursionController) Wait() 
    fmt.Println("Controller waiting...")
    var children, done int
    for 
        select 
        case childrenDelta := <-rc.children:
            children += childrenDelta
            // fmt.Printf("children found %v total %v\n", childrenDelta, children)
        case <-rc.done:
            done += 1
            // fmt.Println("done found", done)
        default:
            if done > 0 && children == done 
                fmt.Printf("Controller exiting, done = %v, children =  %v\n", done, children)
                close(rc.results)
                return
            
        
    

Full source code for the solution

【讨论】:

【参考方案3】:

sync.WaitGroup 可以在这里为您提供帮助。

package main

import (
    "fmt"
    "sync"
    "time"
)


func wait(seconds int, wg * sync.WaitGroup) 
    defer wg.Done()

    time.Sleep(time.Duration(seconds) * time.Second)
    fmt.Println("Slept ", seconds, " seconds ..")



func main() 
    var wg sync.WaitGroup

    for i := 0; i <= 5; i++ 
        wg.Add(1)   
        go wait(i, &wg)
    
    wg.Wait()

【讨论】:

【参考方案4】:

WaitGroups 绝对是执行此操作的规范方法。不过,为了完整起见,这里是在引入 WaitGroups 之前常用的解决方案。基本思想是使用通道说“我完成了”,并让主 goroutine 等待,直到每个生成的例程都报告其完成。

func main() 
    c := make(chan struct) // We don't need any data to be passed, so use an empty struct
    for i := 0; i < 100; i++ 
        go func() 
            doSomething()
            c <- struct // signal that the routine has completed
        ()
    

    // Since we spawned 100 routines, receive 100 messages.
    for i := 0; i < 100; i++ 
        <- c
    

【讨论】:

很高兴看到使用普通频道的解决方案。额外的好处:如果doSomething() 返回一些结果,那么您可以将其放在通道上,并且您可以在第二个 for 循环中收集和处理结果(一旦它们准备好) 只有在您已经知道要开始使用的 gorutines 数量的情况下才有效。如果您正在编写某种 html 爬虫并以递归方式为页面上的每个链接启动 gorutines 怎么办? 无论如何,您都需要以某种方式跟踪这一点。使用 WaitGroups 会更容易一些,因为每次生成新的 goroutine 时,您可以先执行 wg.Add(1) ,因此它会跟踪它们。使用频道会有点困难。 c 将阻塞,因为所有 go 例程都会尝试访问它,并且它是无缓冲的 如果用“block”表示程序会死锁,那不是真的。 You can try running it yourself. 原因是写入c 的唯一goroutine 与从c 读取的主goroutine 不同。因此,主 goroutine 始终可以从通道中读取值,当其中一个 goroutine 可以将值写入通道时,就会发生这种情况。你是对的,如果这段代码没有生成 goroutine,而是在一个 goroutine 中运行所有东西,它就会死锁。【参考方案5】:

您可以使用sync.WaitGroup。引用链接的例子:

package main

import (
        "net/http"
        "sync"
)

func main() 
        var wg sync.WaitGroup
        var urls = []string
                "http://www.golang.org/",
                "http://www.google.com/",
                "http://www.somestupidname.com/",
        
        for _, url := range urls 
                // Increment the WaitGroup counter.
                wg.Add(1)
                // Launch a goroutine to fetch the URL.
                go func(url string) 
                        // Decrement the counter when the goroutine completes.
                        defer wg.Done()
                        // Fetch the URL.
                        http.Get(url)
                (url)
        
        // Wait for all HTTP fetches to complete.
        wg.Wait()

【讨论】:

有什么理由必须在 go 例程之外执行 wg.Add(1) 吗?我们可以在延迟 wg.Done() 之前在里面做吗? 坐,是的,这是有原因的,它在sync.WaitGroup.Add docs中有描述:Note that calls with positive delta must happen before the call to Wait, or else Wait may wait for too small a group. Typically this means the calls to Add should execute before the statement creating the goroutine or other event to be waited for. See the WaitGroup example. 修改此代码导致我进行了长时间的调试,因为我的 goroutine 是一个命名函数,并且将 WaitGroup 作为值传递将复制它并使 wg.Done() 无效。虽然这可以通过传递指针 &wg 来解决,但防止此类错误的更好方法是首先将 WaitGroup 变量声明为指针:wg := new(sync.WaitGroup) 而不是 var wg sync.WaitGroup 我想在for _, url := range urls 的上方写wg.Add(len(urls)) 是有效的,我认为最好只使用一次添加。 @RobertJackWill:好消息!顺便说一句,the docs 对此进行了介绍:“第一次使用后不得复制 WaitGroup。Go 没有强制执行此操作的方法。但实际上,go vet 确实检测到这种情况并警告“func按值传递锁定:sync.WaitGroup包含sync.noCopy”。

以上是关于如何在不使用 time.Sleep 的情况下等待所有 goroutines 完成?的主要内容,如果未能解决你的问题,请参考以下文章

如何在不阻塞 Python 应用程序的情况下运行代码块?

关于如何等待一个元素的出现而不用一些笨拙粗暴的time.sleep()方法

如何在不抛出 TaskCanceledExceptions 的情况下等待任务?

如何在不等待 App Review 的情况下继续使用 TestFlight 功能

如何在不等待 1 秒的情况下对进程的 CPU 使用情况进行采样

如何在不使用跟踪句柄的情况下等待未知数量的 Rust 线​​程完成?