如何确保在 goroutines 中启动的 goroutines 彼此同步?

Posted

技术标签:

【中文标题】如何确保在 goroutines 中启动的 goroutines 彼此同步?【英文标题】:How to ensure goroutines launched within goroutines are synchronized with each other? 【发布时间】:2021-12-19 22:06:16 【问题描述】:

这是我第一次使用 Go 的并发特性,我正在深入研究。

我想对 API 进行并发调用。该请求基于我想要收到的帖子的标签(可以有 1


    "posts": [
        
            "id": 1,
            "author": "Name",
            "authorId": 1,
            "likes": num_likes,
            "popularity": popularity_decimal,
            "reads": num_reads,
            "tags": [ "tag1", "tag2" ]
        ,
        ...
    ]

我的计划是将一堆通道以菊花链方式连接在一起,并生成许多从这些通道读取和或写入的 goroutine:

- for each tag, add it to a tagsChannel inside a goroutine
- use that tagsChannel inside another goroutine to make concurrent GET requests to the endpoint
- for each response of that request, pass the underlying slice of posts into another goroutine
- for each individual post inside the slice of posts, add the post to a postChannel
- inside another goroutine, iterate over postChannel and insert each post into a data structure

这是我目前所拥有的:

func (srv *server) Get() 
    // Using red-black tree prevents any duplicates, fast insertion
    // and retrieval times, and is sorted already on ID.
    rbt := tree.NewWithIntComparator()
    // concurrent approach
    tagChan := make(chan string)                       // tags -> tagChan
    postChan := make(chan models.Post)                 // tagChan -> GET -> post -> postChan
    errChan := make(chan error)                        // for synchronizing errors across goroutines
    wg := &sync.WaitGroup                            // for synchronizing goroutines
    wg.Add(4)
    // create a go func to synchronize our wait groups
    // once all goroutines are finished, we can close our errChan
    go func() 
        wg.Wait()
        close(errChan)
    ()
    go insertTags(tags, tagChan, wg)
    go fetch(postChan, tagChan, errChan, wg)
    go addPostToTree(rbt, postChan, wg)
    for err := range errChan 
        if err != nil 
            srv.HandleError(err, http.StatusInternalServerError).ServeHTTP(w, r)
        
    

// insertTags inserts user's passed-in tags to tagChan
// so that tagChan may pass those along in fetch.
func insertTags(tags []string, tagChan chan<- string, group *sync.WaitGroup) 
    defer group.Done()
    for _, tag := range tags 
        tagChan <- tag
    
    close(tagChan)

// fetch completes a GET request to the endpoint
func fetch(posts chan<- models.Post, tags <-chan string, errs chan<- error, group *sync.WaitGroup) 
    defer group.Done()
    for tag := range tags 
        ep, err := formURL(tag)
        if err != nil 
            errs <- err
        
        group.Add(1) // QUESTION should I use a separate wait group here?
        go func() 
            resp, err := http.Get(ep.String())
            if err != nil 
                errs <- err
            
            container := models.PostContainer
            err = json.NewDecoder(resp.Body).Decode(&container)
            defer resp.Body.Close()
            group.Add(1) // QUESTION should I add a separate wait group here and pass it to insertPosts?
            go insertPosts(posts, container.Posts, group)
            defer group.Done()
        ()
        // group.Done() -- removed this call due to Burak, but now my program hands
    

// insertPosts inserts each individual post into our posts channel so that they may be
// concurrently added to our RBT.
func insertPosts(posts chan<- models.Post, container []models.Post, group *sync.WaitGroup) 
    defer group.Done()
    for _, post := range container 
        posts <- post
    

// addPostToTree iterates over the channel and
// inserts each individual post into our RBT,
// setting the post ID as the node's key.
func addPostToTree(tree *tree.RBT, collection <-chan models.Post, group *sync.WaitGroup) 
    defer group.Done()
    for post := range collection 
        // ignore return value & error here:
        // we don't care about the returned key and
        // error is only ever if a duplicate is attempted to be added -- we don't care
        tree.Insert(post.ID, post)
    


我可以向端点发出一个请求,但是一旦尝试提交第二个请求,我的程序就会失败并显示panic: sync: negative WaitGroup counter。 p>

我的问题是为什么我的 WaitGroup 计数器变为负数?我确保添加到等待组并标记我的 goroutine 何时完成。

如果等待组在第二个请求上是否定的,那么这一定意味着我第一次分配等待组并将其添加 4 被跳过...为什么?这可能与关闭频道有关吗?如果是这样,我在哪里关闭频道?

还有——有人有调试 goroutine 的技巧吗?

感谢您的帮助。

【问题讨论】:

fetch 的 for 循环末尾的 group.Done 是对 Done 的额外调用,删除它。 感谢@BurakSerdar。我已经删除了那个额外的 group.Done() ,现在当我运行我的程序时,它挂起 - 所以某处 WaitGroup 未完成或通道阻塞。有什么提示可以找出发生了什么? @cmt_ printf-debugging :) 您将4 添加到等待组,但您只启动了 3 个 goroutine。您应该只在启动 goroutine 之前添加 1。因此,您可以避免这些不一致。 【参考方案1】:

首先,整个设计相当复杂。最后提到了我的想法。

你的代码有两个问题:

    posts 通道永远不会关闭,因此 addPostToTree 可能永远不会存在循环,导致一个 waitGroup 永远不会减少(在您的情况下程序挂起)。程序有可能无限期地等待死锁(以为其他 goroutine 会释放它,但所有 goroutine 都卡住了)。解决方案:您可以关闭postChan 频道。但是怎么做?始终建议制作人始终关闭频道,但您有多个制作人。所以最好的选择是,等待所有生产者完成,然后关闭通道。为了等待所有生产者完成,您需要创建另一个 waitGroup 并使用它来跟踪子例程。

代码:

// fetch completes a GET request to the endpoint
func fetch(posts chan<- models.Post, tags <-chan string, errs chan<- error, group *sync.WaitGroup) 
    postsWG := &sync.WaitGroup
    for tag := range tags 
        ep, err := formURL(tag)
        if err != nil 
            errs <- err
        
        postsWG.Add(1) // QUESTION should I use a separate wait group here?
        go func() 
            resp, err := http.Get(ep.String())
            if err != nil 
                errs <- err
            
            container := models.PostContainer
            err = json.NewDecoder(resp.Body).Decode(&container)
            defer resp.Body.Close()
            go insertPosts(posts, container.Posts, postsWG)
        ()
    

    defer func() 
        postsWG.Wait()
        close(posts)
        group.Done()
    ()

    现在,我们有另一个问题,主 waitGroup 应该使用 3 而不是 4 初始化。这是因为主例程只增加了 3 个例程wg.Add(3),所以它必须只跟踪那些。对于子例程,我们使用不同的 waitGroup,因此这不再是父例程的头疼问题。

代码:

errChan := make(chan error)                        // for synchronizing errors across goroutines
    wg := &sync.WaitGroup                            // for synchronizing goroutines
    wg.Add(3)
    // create a go func to synchronize our wait groups
    // once all goroutines are finished, we can close our errChan

TLDR --

复杂的设计 - 因为主要的等待组是在一个地方开始的,但是每个 goroutine 都会根据需要修改这个 waitGroup。所以没有单一的所有者,这使得调试和维护变得超级复杂(+ 不能确保它没有错误)。 我建议将其分解并为每个子例程设置单独的跟踪器。这样,调用更多例程的调用者只能专注于跟踪其子 goroutine。然后,该例程将仅在其完成后才通知其父 waitGroup(及其子程序完成,而不是让子程序直接通知祖父母)。

另外,在fetch 方法中,在进行 HTTP 调用并获得响应后,为什么要创建另一个 goroutine 来处理这些数据?无论哪种方式,这个 goroutine 在数据插入发生之前都无法退出,也不会执行数据处理发生的其他操作。据我了解,第二个 goroutine 是多余的。

group.Add(1) // QUESTION should I add a separate wait group here and pass it to insertPosts?
            go insertPosts(posts, container.Posts, group)
            defer group.Done()

【讨论】:

非常感谢我的朋友!这解决了我的问题。正如 Rob Pike 所说,并发最重要的是设计——显然我需要在我的设计上工作。但是,正如我所说,这是我第一次使用 Go 的并发特性。我还有很长的路要走……去:)。我真的很喜欢菊花链设计模式,并尝试实现它——但是对于菊花链通道,我需要继续启动 goroutines 吗?例如,在我的 fetch() 例程中,当我同时向频道插入帖子时,我使用 goroutine。我可以将项目添加到 goroutine 之外的通道吗?

以上是关于如何确保在 goroutines 中启动的 goroutines 彼此同步?的主要内容,如果未能解决你的问题,请参考以下文章

你如何使用WaitGroup确保goroutine在for循环中完成?

Golang basic_leaminggoroutine channel 实现并发和并行

Golang basic_leaminggoroutine channel 实现并发和并行

源码分析golang协程闭包数据陷阱

Go语言之通道

如何设计 goroutines 程序来处理 api 限制错误