Go 中的简单工作池

Posted

技术标签:

【中文标题】Go 中的简单工作池【英文标题】:Simple worker pool in Go 【发布时间】:2017-03-17 06:03:01 【问题描述】:

我正在尝试在 go 中实现一个简单的工作池并不断遇到问题。我想做的就是让一定数量的工人在做更多工作之前完成一定数量的工作。我使用的代码类似于:

    jobs := make(chan imageMessage, 1)
    results := make(chan imageMessage, 1)

    for w := 0; w < 2; w++ 
        go worker(jobs, results)
    

    for j := 0; j < len(images); j++ 
        jobs <- imageMessagepath: paths[j], img: images[j]
    
    close(jobs)

    for r := 0; r < len(images); r++ 
        <-results
    


func worker(jobs <-chan imageMessage, results chan<- imageMessage) 
    for j := range jobs 
        processImage(j.path, j.img)
        results <- j
    

我的理解是,这应该会创建 2 名工人,他们一次可以做 1 件“事情”,并且在他们完成那件事情时会继续获得更多的工作,直到没有其他事情可做。但是,我得到fatal error: all goroutines are asleep - deadlock!

如果我将缓冲区设置为 100 之类的巨大值,这可行,但我希望能够限制一次完成的工作。

我觉得我很接近,但显然错过了什么。

【问题讨论】:

相关:Is this an idiomatic worker thread pool in Go? 【参考方案1】:

问题在于,您只有在成功发送 jobs 频道上的所有作业后才开始“排空”results 频道。但要让您能够发送所有作业,jobs 通道必须有足够大的缓冲区,或者工作 goroutine 必须能够从中使用作业。

但是一个worker goroutines 在消费一个工作时,在它可以接受下一个工作之前,将结果发送到results 频道。如果results通道的缓冲区已满,发送结果会阻塞。

但最后一部分——一个在发送结果时被阻塞的工作 goroutine——只能通过从results 通道接收来“解除阻塞”——直到你可以发送所有作业。如果jobs 通道和results 通道的缓冲区无法容纳您的所有工作,则会出现死锁。这也解释了为什么如果将缓冲区大小增加到一个较大的值会起作用:如果作业可以放入缓冲区,则不会发生死锁,并且在成功发送所有作业后,您的最终循环将耗尽results 通道。

解决方案?在自己的 goroutine 中运行生成和发送作业,因此您可以“立即”开始从 results 频道接收,而无需等待发送所有作业,这意味着工作 goroutine 不会在尝试发送结果时永远被阻塞:

go func() 
    for j := 0; j < len(images); j++ 
        jobs <- imageMessagepath: paths[j], img: images[j]
    
    close(jobs)
()

在Go Playground 上试用。

还可以查看Is this an idiomatic worker thread pool in Go?中的类似实现

【讨论】:

【参考方案2】:

你可以使用这个工人。简单高效。 https://github.com/tamnguyenvt/go-worker

NewWorkerManager(WorkerManagerParams
    WorkerSize: <number of workers>,
    RelaxAfter: <sleep for awhile to relax server after given duration>,
    RelaxDuration: <relax duration>,
    WorkerFunc: <your worker function here>,
    LogEnable: <enable log or not>,
    StopTimeout: <timeout all workers after given duration>,

【讨论】:

以上是关于Go 中的简单工作池的主要内容,如果未能解决你的问题,请参考以下文章

Python - 多进程池中的 make_archive zip 无法正常工作

go语言实现简单的聊天室

Python中的多处理:处理多个工作线程

多处理池中的全局变量

多进程池中的 apply_async 问题

C# -Task.Run() 中线程池中的索引超出范围异常