golang 合并频道

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了golang 合并频道相关的知识,希望对你有一定的参考价值。

package main

import "sync"

func merge(cs ...chan interface{}) <-chan interface{} {
	var wg sync.WaitGroup
	out := make(chan interface{})
	output := func(c <-chan interface{}) {
		for e := range c {
			out <- e
		}
		wg.Done()
	}
	wg.Add(len(cs))
	for _, c := range cs {
		go output(c)
	}
	go func() {
		wg.Wait()
		close(out)
	}()

	return out
}

为啥我的 Golang 频道写入永远阻塞?

【中文标题】为啥我的 Golang 频道写入永远阻塞?【英文标题】:Why is my Golang Channel Write Blocking Forever?为什么我的 Golang 频道写入永远阻塞? 【发布时间】:2016-09-23 05:07:45 【问题描述】:

在过去的几天里,我一直试图通过重构我的一个命令行实用程序来尝试在 Golang 中的并发性,但我被卡住了。

Here's原代码(master分支)。

Here's并发分支(x_concurrent 分支)。

当我使用go run jira_open_comment_emailer.go 执行并发代码时,如果将JIRA 问题添加到here 频道,defer wg.Done() 将永远不会执行,这会导致我的wg.Wait() 永远挂起。

我的想法是我有大量的 JIRA 问题,我想为每个问题分拆一个 goroutine,看看它是否有我需要回复的评论。如果是这样,我想将它添加到某个结构(经过一些研究后我选择了一个频道),我可以稍后像队列一样读取它以建立电子邮件提醒。

这是代码的相关部分:

// Given an issue, determine if it has an open comment
// Returns true if there is an open comment on the issue, otherwise false
func getAndProcessComments(issue Issue, channel chan<- Issue, wg *sync.WaitGroup) 
    // Decrement the wait counter when the function returns
    defer wg.Done()

    needsReply := false

    // Loop over the comments in the issue
    for _, comment := range issue.Fields.Comment.Comments 
        commentMatched, err := regexp.MatchString("~"+config.JIRAUsername, comment.Body)
        checkError("Failed to regex match against comment body", err)

        if commentMatched 
            needsReply = true
        

        if comment.Author.Name == config.JIRAUsername 
            needsReply = false
        
    

    // Only add the issue to the channel if it needs a reply
    if needsReply == true 
        // This never allows the defered wg.Done() to execute?
        channel <- issue
    


func main() 
    start := time.Now()

    // This retrieves all issues in a search from JIRA
    allIssues := getFullIssueList()

    // Initialize a wait group
    var wg sync.WaitGroup

    // Set the number of waits to the number of issues to process
    wg.Add(len(allIssues))

    // Create a channel to store issues that need a reply
    channel := make(chan Issue)

    for _, issue := range allIssues 
        go getAndProcessComments(issue, channel, &wg)
    

    // Block until all of my goroutines have processed their issues.
    wg.Wait()

    // Only send an email if the channel has one or more issues
    if len(channel) > 0 
        sendEmail(channel)
    

    fmt.Printf("Script ran in %s", time.Since(start))

【问题讨论】:

您到处都有len(channel),但该频道没有长度,因为它没有缓冲。您需要从通道接收以完成任何发送(通常,根据缓冲通道的长度做出决策是错误的,因为并发操作可能会竞相更改该值) 所以,如果我正在对通道进行所有写入,等待它们完成,然后从通道读取......这永远不会发生,因为发送永远不会真正完成并且触发defer wg.Done()?一般来说,您将如何解决实现这种并发性的问题?另外,我不确定您在 len(channel) 上是否正确,因为 godocs 声明它返回通道中当前的元素数量,而不是像 cap(channel) 这样的容量。 golang.org/pkg/builtin/#len len(channel) 返回“缓冲”通道中的当前项目数,但由于通道通常是同时使用的,所以 len 的结果一读到它就“过时”。通常会有并发的 goroutine 从通道发送和接收。我建议再次浏览 Tour Of Go 中的 Concurrency 部分,以更好地了解频道的工作原理。 @s_dolan 是的,第一个通道发送将阻塞,直到有人读取它,这永远不会发生。您可以做的最简单的事情是启动一个 goroutine,它在 写入之前从通道的另一端读取。至于 len 和 cap,请考虑 len(c) 始终 cap(c)。 【参考方案1】:

goroutines 在发送到无缓冲通道时阻塞。 解除 goroutine 阻塞的最小变化是创建一个缓冲通道,该通道具有所有问题的容量:

channel := make(chan Issue, len(allIssues))

并在调用 wg.Wait() 后关闭通道。

【讨论】:

但这有点违背了通道作为并发块之间管道的目的...... @RickyA 使用通道没有错,就是队列。 是的,它可以节省您传递互斥切片的开销。 是的,带有互斥锁的切片需要更多代码,并且在这种情况下不太可能表现得更好。 是的,缓冲通道是很好的 fifo 缓冲区/队列,但在这种情况下,当每个问题已经在其自己的 goroutine 中“排队”时,没有理由等待缓冲区填满。目标是在这里提高并发性,而不是在管道中添加另一个队列。

以上是关于golang 合并频道的主要内容,如果未能解决你的问题,请参考以下文章

golang golang频道测试

golang [去阻塞频道]与Go#golang,#go,#go channels,#go synchronization,#goroutines,#waitgroups中的频道同步和阻止,

为啥我的 Golang 频道写入永远阻塞?

Golang频道睡着了

Golang频道发行

选择中的Golang频道未接收