go例程的子集上的waitgroup

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了go例程的子集上的waitgroup相关的知识,希望对你有一定的参考价值。

我有情况在哪里,主要的例程将创建“x”go例程。但它只对“y”(y <x)go例程感兴趣才能完成。

我希望使用Waitgroup。但Waitgroup只允许我等待所有的例程。我不能,例如这样做,

1. wg.Add (y)
2 create "x" go routines. These routines will call wg.Done() when finished. 
3. wg. Wait()

当y + 1 go例程调用wg.Done()因为wg计数器变为负数时,这会引起恐慌。

我当然可以使用渠道来解决这个问题,但是如果Waitgroup解决这个问题,我很感兴趣。

答案

Adrian's answer所述,sync.WaitGroup是一个简单的计数器,其Wait方法将阻塞,直到计数器值达到零。它允许您在允许主要执行流程继续之前阻止(或加入)多个goroutine。

WaitGroup的界面对于您的用例来说并不具有足够的表现力,也不是设计用的。特别是,你不能通过简单地调用wg.Add(y)(其中y <x)来天真地使用它。第(y + 1)个goroutine对wg.Done的调用将是cause a panic,因为等待组内部值为负值是错误的。此外,通过观察WaitGroup的内部计数器值,我们不能“聪明”;这会打破抽象,无论如何,它的内部状态不会被导出。


Implement your own!

您可以使用下面的代码(playground link)使用某些通道自己实现相关逻辑。从控制台观察到10个goroutine已经启动,但是在完成两个goroutine后,我们将继续执行main方法。

package main

import (
    "fmt"
    "time"
)

// Set goroutine counts here
const (
    // The number of goroutines to spawn
    x = 10
    // The number of goroutines to wait for completion
    // (y <= x) must hold.
    y = 2
)

func doSomeWork() {
    // do something meaningful
    time.Sleep(time.Second)
}

func main() {
    // Accumulator channel, used by each goroutine to signal completion.
    // It is buffered to ensure the [y+1, ..., x) goroutines do not block
    // when sending to the channel, which would cause a leak. It will be
    // garbage collected when all goroutines end and the channel falls
    // out of scope. We receive y values, so only need capacity to receive
    // (x-y) remaining values.
    accChan := make(chan struct{}, x-y)

    // Spawn "x" goroutines
    for i := 0; i < x; i += 1 {
        // Wrap our work function with the local signalling logic
        go func(id int, doneChan chan<- struct{}) {
            fmt.Printf("starting goroutine #%d
", id)
            doSomeWork()
            fmt.Printf("goroutine #%d completed
", id)

            // Communicate completion of goroutine
            doneChan <- struct{}{}
        }(i, accChan)
    }

    for doneCount := 0; doneCount < y; doneCount += 1 {
        <-accChan
    }

    // Continue working
    fmt.Println("Carrying on without waiting for more goroutines")
}

避免资源泄漏

由于这不等待[y + 1,...,x] goroutines完成,你应该特别注意doSomeWork函数来删除或最小化工作无限期阻塞的风险,这也会导致泄漏。在可能的情况下,删除I / O(包括通道操作)无限期阻塞或陷入无限循环的可行性。

你可以使用context向其他goroutine发出信号,告诉他们不再需要他们的结果让他们突破执行。

另一答案

WaitGroup实际上并没有等待goroutines,它一直等到内部计数器达到零。如果你只关注你所关心的goroutine的数量,并且你只在你关心的那些goroutine中调用Add(),那么Done()将只会阻止,直到你关心的那些goroutines完成。你完全控制逻辑和流程,对Wait()“允许”没有限制。

另一答案

您尝试跟踪这些特定的常规例程,还是x中的任何一个?标准是什么?

更新:

1.如果您可以控制任何标准来选择qazxsw poi go-routines:

你可以根据你的情况从goroutine内部做WaitGroupmatching y,将它作为指针参数传递给goroutine,如果你的状况不能在goroutine之外检查。

类似下面的示例代码。如果您提供有关您要执行的操作的更多详细信息,将能够更具体。

wp.wg.Add(1)

2.如果你无法控制哪些,只需要wp.wg.Done()

根据您的评论,您无法控制/希望选择任何特定的goroutine,而是首先完成的goroutine。如果您希望以通用方式执行此操作,则可以使用适合您的用例的以下自定义waitGroup实现。 (但它不是复制安全的。也没有/需要wg.Add(int)方法)

func sampleGoroutine(z int, b string, wg *sync.WaitGroup){

    defer func(){
        if contition1{
            wg.Done()
        }
    }

    if contition1 {
        wg.Add(1)
        //do stuff
    }
}

func main() {
    wg := sync.WaitGroup{}
    for i := 0; i < x; i++ {
        go sampleGoroutine(1, "one", &wg)
    }
    wg.Wait()
}

可以使用如下:

first y

3.你也可以在type CountedWait struct { wait chan struct{} limit int } func NewCountedWait(limit int) *CountedWait { return &CountedWait{ wait: make(chan struct{}, limit), limit: limit, } } func (cwg *CountedWait) Done() { cwg.wait <- struct{}{} } func (cwg *CountedWait) Wait() { count := 0 for count < cwg.limit { <-cwg.wait count += 1 } } 中加入2以确保其余的goroutine不泄漏你可能无法在play.golang上运行它,因为它有一些长时间的睡眠。

下面是一个示例输出:(注意,可能有超过y = 3个goroutines标记完成,但您只等到3完成)

func sampleGoroutine(z int, b string, wg *CountedWait) { success := false defer func() { if success == true { fmt.Printf("goroutine %d finished successfully ", z) wg.Done() } }() fmt.Printf("goroutine %d started ", z) time.Sleep(time.Second) if rand.Intn(10)%2 == 0 { success = true } } func main() { x := 10 y := 3 wg := NewCountedWait(y) for i := 0; i < x; i += 1 { // Wrap our work function with the local signalling logic go sampleGoroutine(i, "something", wg) } wg.Wait() fmt.Printf("%d out of %d goroutines finished successfully. ", y, x) }

播放链接

  1. context
  2. goroutine 9 started goroutine 0 started goroutine 1 started goroutine 2 started goroutine 3 started goroutine 4 started goroutine 5 started goroutine 5 marking done goroutine 6 started goroutine 7 started goroutine 7 marking done goroutine 8 started goroutine 3 marking done continuing after 3 out of 10 goroutines finished successfully. goroutine 9 will be killed, bcz cancel goroutine 8 will be killed, bcz cancel goroutine 6 will be killed, bcz cancel goroutine 1 will be killed, bcz cancel goroutine 0 will be killed, bcz cancel goroutine 4 will be killed, bcz cancel goroutine 2 will be killed, bcz cancel
  3. https://play.golang.org/p/l5i6X3GClBq

以上是关于go例程的子集上的waitgroup的主要内容,如果未能解决你的问题,请参考以下文章

Go语言学习之Go协程:WaitGroup

Golang sync.WaitGroup 简介与用法

如何检查是不是在单元测试中调用了 sync.WaitGroup.Done()

3.2 go WaitGroup代码示例

Go语言sync包中的WaitGroup使用实例

所有 goroutine 都处于休眠状态 - 死锁