如何在 Golang 中正确处理缓冲通道?
Posted
技术标签:
【中文标题】如何在 Golang 中正确处理缓冲通道?【英文标题】:How to handle the buffered channel properly in Golang? 【发布时间】:2016-11-19 21:57:03 【问题描述】:我有一个存储接收到的数据的通道,我想在满足以下条件之一时对其进行处理: 1、通道达到其容量。 2,自上次进程后触发计时器。
我看到了帖子 Golang - How to know a buffered channel is full
更新:
我从那篇帖子和 OneOfOne 的建议中得到启发,这里是 play:
package main
import (
"fmt"
"math/rand"
"time"
)
var c chan int
var timer *time.Timer
const (
capacity = 5
timerDration = 3
)
func main()
c = make(chan int, capacity)
timer = time.NewTimer(time.Second * timerDration)
go checkTimer()
go sendRecords("A")
go sendRecords("B")
go sendRecords("C")
time.Sleep(time.Second * 20)
func sendRecords(name string)
for i := 0; i < 20; i++
fmt.Println(name+" sending record....", i)
sendOneRecord(i)
interval := time.Duration(rand.Intn(500))
time.Sleep(time.Millisecond * interval)
func sendOneRecord(record int)
select
case c <- record:
default:
fmt.Println("channel is full !!!")
process()
c <- record
timer.Reset(time.Second * timerDration)
func checkTimer()
for
select
case <-timer.C:
fmt.Println("3s timer ----------")
process()
timer.Reset(time.Second * timerDration)
func process()
for i := 0; i < capacity; i++
fmt.Println("process......", <-c)
这似乎工作正常,但我有一个问题,我想在调用 process() 时阻止来自其他 goroutine 的通道写入,上面的代码可以这样做吗?或者我应该在进程方法的开头添加一个互斥锁?
任何优雅的解决方案?
【问题讨论】:
如果这是性能优化,我会在处理器中使用for task := range c ...
。当通道缓冲区填满时,调度程序将使您的发送方阻塞,从而为处理器运行留出时间。它往往工作正常。如果您有其他原因想要控制什么时候运行,您可能会通过在新问题中解释它来获得更好的答案。
【参考方案1】:
不,选择是唯一的方法:
func (t *T) Send(v *Val)
select
case t.ch <- v:
default:
// handle v directly
【讨论】:
【参考方案2】:正如@OneOfOne 所提到的,选择确实是检查通道是否已满的唯一方法。
如果你使用通道来实现批处理,你总是可以创建一个无缓冲的通道,并有一个 goroutine 拉取项目并附加到一个切片。
当切片达到特定大小时,处理项目。
这是play上的一个例子
package main
import (
"fmt"
"sync"
"time"
)
const BATCH_SIZE = 10
func batchProcessor(ch <-chan int)
batch := make([]int, 0, BATCH_SIZE)
for i := range ch
batch = append(batch, i)
if len(batch) == BATCH_SIZE
fmt.Println("Process batch:", batch)
time.Sleep(time.Second)
batch = batch[:0] // trim back to zero size
fmt.Println("Process last batch:", batch)
func main()
var wg sync.WaitGroup
ch := make(chan int)
wg.Add(1)
go func()
batchProcessor(ch)
wg.Done()
()
fmt.Println("Submitting tasks")
for i := 0; i < 55; i++
ch <- i
close(ch)
wg.Wait()
【讨论】:
以上是关于如何在 Golang 中正确处理缓冲通道?的主要内容,如果未能解决你的问题,请参考以下文章