Go并发循环逻辑

Posted

技术标签:

【中文标题】Go并发循环逻辑【英文标题】:Go Concurrency Circular Logic 【发布时间】:2021-12-30 17:12:36 【问题描述】:

我刚刚开始接触 Go 的并发性,并尝试创建一个调度 go 例程,该例程会将作业发送到在 jobchan 通道上侦听的工作池。如果一条消息通过 dispatchchan 通道进入我的调度函数并且我的其他 go 例程正忙,则该消息将附加到调度程序的堆栈切片上,并且调度程序将在稍后有工作人员可用时尝试再次发送,和/或没有在 dispatchchan 上收到更多消息。这是因为 dispatchchan 和 jobchan 没有缓冲,并且工作人员正在运行的 go 例程会将其他消息附加到调度程序直到某个点,我不希望工作人员阻塞等待调度程序并造成死锁。这是我到目前为止提出的调度程序代码:

func dispatch() 
var stack []string
acount := 0
for 
    select 
    case d := <-dispatchchan:
        stack = append(stack, d)
    case c := <-mw:
        acount = acount + c
    case jobchan <-stack[0]:
        if len(stack) > 1 
            stack[0] = stack[len(stack)-1]
            stack = stack[:len(stack)-1]
         else 
            stack = nil
        
    default:
        if acount == 0 && len(stack) == 0 
            close(jobchan)
            close(dispatchchan)
            close(mw)
            wg.Done()
            return
        
    

https://play.golang.wiki/p/X6kXVNUn5N7的完整示例

mw 通道是一个缓冲通道,长度与 worker go 例程的数量相同。它充当工作池的信号量。如果工作程序正在执行 [m] 有意义的 [w]ork,它会在 mw 通道上抛出 int 1,当它完成工作并返回 for 循环监听 jobchan 时,它会在 mw 上抛出 int -1。这样,调度程序就知道工作池是否正在完成任何工作,或者该池是否处于空闲状态。如果池处于空闲状态并且堆栈上没有更多消息,则调度程序关闭通道并将控制权返回给主函数。

这一切都很好,但我遇到的问题是堆栈本身的长度可能为零,所以在我尝试将堆栈 [0] 发送到 jobchan 的情况下,如果堆栈为空,我会收到越界错误.我想弄清楚的是如何确保当我遇到这种情况时,stack[0] 是否有一个值。我不希望这种情况向 jobchan 发送一个空字符串。

非常感谢任何帮助。如果有我应该考虑的更惯用的并发模式,我很想听听。我不是 100% 相信这个解决方案,但这是迄今为止我得到的最远距离。

【问题讨论】:

请提供一个完整的例子。 dispatch 没有定义 jobchan。我想考虑这个问题,但由于我必须对实际发生的事情做出假设,这将浪费每个人的时间。 抱歉...我继续将完整的测试代码放入 Go Playground。谢谢play.golang.wiki/p/X6kXVNUn5N7 在您的示例中,您的 gofunc 写入 dispatchchan ,所以这项工作永远不会完成,对吧?它不是循环逻辑,而是无限循环。这就是你想要的吗? 谢谢丹尼尔。实际上这是一个错误。最初在 gofunc 中,我将 Sprintf 到 dispatchchan 包装在 if 语句中,这样一旦 msgcnt 达到限制,它就不会再这样做了: if msgcnt 这是一个很好的例子:github.com/Joker666/goworkerpool 【参考方案1】:

这一切都很好,但我遇到的问题是堆栈本身的长度可能为零,所以在我尝试将堆栈 [0] 发送到 jobchan 的情况下,如果堆栈为空,则会出现越界错误.

我无法使用您的游乐场链接复制它,但它是可信的,因为至少有一个 gofunc 工作人员可能已经准备好在该频道上接收。

我的输出是Msgcnt: 0,这也很容易解释,因为当dispatch() 运行它的select 时,gofunc 可能已经准备好在jobschan 上接收。这些操作的顺序没有定义。

尝试创建一个调度 go 例程,将作业发送到在 jobchan 通道上监听的工作池

频道不需要调度程序。频道调度员。

如果一条消息通过 dispatchchan 通道进入我的调度函数,而我的其他 go 例程正忙,则消息 [...] 将 [...] 稍后当工作人员可用时再次发送,[... ] 或 dispatchchan 上不再收到消息。

通过一些创造性的编辑,很容易将其变成接近缓冲通道定义的东西。它可以立即读取,它最多可以占用一些“limit”无法立即发送的消息。您确实定义了limit,尽管它没有在您的代码中的其他地方使用。

在任何函数中,定义一个您不读取的变量都会导致编译时错误,例如limit declared but not used。这种限制提高了代码质量并有助于识别类型。但是在包范围内,您已经摆脱了将未使用的 limit 定义为“全局”,从而避免了一个有用的错误 - 您没有限制任何东西。

不要使用全局变量。使用传递的参数来定义作用域,因为作用域的定义相当于用go关键字表达的功能并发。 local范围内定义的相关通道传递给在包范围内定义的函数,以便您可以轻松跟踪它们的关系。并使用directional channels 来强制执行您的函数之间的生产者/消费者关系。稍后会详细介绍。

回到“限制”,限制您排队的作业数量是有意义的,因为所有资源都是有限的,并且接受比您预期的处理更多的消息需要比进程内存提供的更持久的存储空间。如果您觉得没有义务满足这些要求无论如何,首先不要接受“太多”的要求。

那么,dispatchchandispatch() 有什么功能呢?在处理之前存储有限数量的待处理请求(如果有的话),然后将它们发送给下一个可用的工作人员?这正是缓冲通道的用途。

循环逻辑

谁“知道”您的程序何时完成? main() 提供初始输入,但您在 `dispatch() 中关闭了所有 3 个通道:

            close(jobchan)
            close(dispatchchan)
            close(mw)

您的工作人员写入自己的作业队列,因此只有在工作人员完成写入后才能关闭传入的作业队列。但是,个别工作人员也不知道何时关闭作业队列,因为其他工作人员正在写入它。 没有人知道你的算法什么时候完成。这就是你的循环逻辑。

mw 通道是一个缓冲通道,长度与 worker go 例程的数量相同。它充当工作池的信号量。

这里有一个竞争条件。考虑所有n 工作人员刚刚收到最后一个n 工作的情况。他们每个人都从jobschan 读取,并且他们正在检查ok 的值。 disptatcher 继续运行其 select。现在没有人写信给dispatchchan 或读jobschan,所以default 的情况会立即匹配。 len(stack)0 并且没有当前的 job 所以 dispatcher 关闭所有频道,包括 mw。此后的某个时候,工作人员尝试写入已关闭的通道并出现恐慌。

所以最后我准备提供一些代码,但我还有一个问题:我没有明确的问题陈述来编写代码。

我刚刚开始接触 Go 的并发性,并尝试创建一个调度 go 例程,它将作业发送到在 jobchan 通道上侦听的工作池。

goroutine 之间的通道就像同步齿轮的齿。但是齿轮会转动到什么地方呢?你不是想保持时间,也不是制造发条玩具。你的齿轮可以转动,但成功会是什么样子?轮到他们了?

让我们尝试为通道定义一个更具体的用例:给定一组任意长的持续时间作为标准输入*的字符串,在其中一个 n 工作人员中休眠那么多秒。所以我们实际上有一个 result 要返回,我们会说每个工作人员将返回运行持续时间的开始和结束时间。

为了让它可以在操场上运行,我将使用硬编码的字节缓冲区模拟标准输入。
package main

import (
    "bufio"
    "bytes"
    "fmt"
    "os"
    "strings"
    "sync"
    "time"
)

type SleepResult struct 
    worker_id int
    duration  time.Duration
    start     time.Time
    end       time.Time


func main() 
    var num_workers = 2
    workchan := make(chan time.Duration)
    resultschan := make(chan SleepResult)
    var wg sync.WaitGroup
    var resultswg sync.WaitGroup
    resultswg.Add(1)
    go results(&resultswg, resultschan)
    for i := 0; i < num_workers; i++ 
        wg.Add(1)
        go worker(i, &wg, workchan, resultschan)
    
    // playground doesn't have stdin
    var input = bytes.NewBufferString(
        strings.Join([]string
            "3ms",
            "1 seconds",
            "3600ms",
            "300 ms",
            "5s",
            "0.05min", "\n") + "\n")

    var scanner = bufio.NewScanner(input)
    for scanner.Scan() 
        text := scanner.Text()
        if dur, err := time.ParseDuration(text); err != nil 
            fmt.Fprintln(os.Stderr, "Invalid duration", text)
         else 
            workchan <- dur
        
    
    close(workchan) // we know when our inputs are done
    wg.Wait()       // and when our jobs are done
    close(resultschan)
    resultswg.Wait()


func results(wg *sync.WaitGroup, resultschan <-chan SleepResult) 
    for res := range resultschan 
        fmt.Printf("Worker %d: %s : %s => %s\n",
            res.worker_id, res.duration,
            res.start.Format(time.RFC3339Nano), res.end.Format(time.RFC3339Nano))
    
    wg.Done()


func worker(id int, wg *sync.WaitGroup, jobchan <-chan time.Duration, resultschan chan<- SleepResult) 
    var res = SleepResultworker_id: id
    for dur := range jobchan 
        res.duration = dur
        res.start = time.Now()
        time.Sleep(res.duration)
        res.end = time.Now()
        resultschan <- res
    
    wg.Done()

这里我使用了 2 个等待组,一个用于工作人员,一个用于结果。这确保我在main() 结束之前完成了所有结果的编写。我通过让每个函数一次只做一件事来保持我的函数简单:main 读取输入,从中解析持续时间,然后将它们发送给下一个 worker。 results 函数收集结果并将它们打印到标准输出。工作人员在睡觉,从jobchan 读取并写入resultschan

workchan 可以缓冲(或不缓冲,如本例所示);没关系,因为输入将以可以处理的速度读取。我们可以缓冲尽可能多的输入,但我们不能缓冲无限量。我将频道大小设置为1e6 - 但一百万远小于无限。对于我的用例,我根本不需要做任何缓冲。

main 知道输入何时完成并可以关闭jobschanmain 也知道作业何时完成 (wg.Wait()) 并可以关闭结果通道。关闭这些通道是对workerresults goroutines 的一个重要信号——它们可以区分一个空的通道和一个保证不会有任何新添加的通道。

for job := range jobchan ... 是更详细的简写:

for 
  job, ok :=  <- jobchan
  if !ok 
    wg.Done()
    return
  
  ...

请注意,此代码创建 2 个工人,但它可以创建 20 个或 2000 个,甚至 1 个。无论池中有多少工人,程序都会运行。它可以处理任何数量的输入(尽管无休止的输入当然会导致无休止的程序)。它确实创建输出到输入的循环循环。如果您的用例需要作业来创建更多作业,则这是一个更具挑战性的场景,通常可以通过仔细规划来避免。

我希望这能给您一些关于如何在 Go 应用程序中更好地使用并发的好主意。

https://play.golang.wiki/p/cZuI9YXypxI

【讨论】:

感谢丹尼尔如此深入。这有助于为我指明正确的方向。我正在阅读其他一些线程,您的示例反映了我在其他地方阅读的内容,我不应该尝试使用全局变量,而是在函数中传递变量。基本上,我正在尝试做的是在工作池内提供递归。工人可以将额外的工作交还给“调度员”,直到外部影响停止让他们这样做,此时缓冲区耗尽并且循环结束。我会消化你所有的信息。我相信它会极大地帮助我。谢谢!

以上是关于Go并发循环逻辑的主要内容,如果未能解决你的问题,请参考以下文章

几段 Go 并发代码

《Go in action》读后记录:Go的并发与并行

GO并发详解

Go 并发编程模型

go——并发

Go语言并发编程简单入门