问题
如何支持一个无容量限制的channel
- 取出元素会阻塞到元素存在并且返回
- 放入元素永远不会阻塞,都会立即返回
方法一:用两个chan加一个list模拟
在单独的goroutine处理入队和出队,这样不用给list加锁。
完整代码:https://github.com/luweimy/goutil/blob/master/syncq/syncq.go
q := &SyncQueue{
ctx: ctx,
cancel: cancel,
l: list.New(),
max: max,
in: make(chan interface{}),
out: make(chan interface{}),
}
func (q *SyncQueue) dispatch() {
for {
if q.l.Len() == 0 {
// the queue is empty, only enqueue is allowed.
select {
case v := <-q.in:
q.l.PushBack(v)
case <-q.ctx.Done():
return
}
}
e := q.l.Front()
if q.max > 0 && q.l.Len() >= q.max {
// the queue is full, only dequeue is allowed.
select {
case q.out <- e.Value:
q.l.Remove(e)
case <-q.ctx.Done():
return
}
} else {
// enqueue and dequeue are allowed.
select {
case value := <-q.in:
q.l.PushBack(value)
case q.out <- e.Value:
q.l.Remove(e)
case <-q.ctx.Done():
return
}
}
}
}
但是这种方法速度很慢,跑benchmark只有1234 ns/op
方法二:用sync.Cond通知
这个方法比较简单,就是利用sync.Cond的通知机制。
出队时,检测队列内有无元素,有就直接返回,没有则阻塞等待条件变量。
入队时,触发条件变量通知一个阻塞的端点恢复运行。
完整代码:https://github.com/luweimy/goutil/blob/master/syncq2/syncq2.go
func (q *SyncQueue) Enqueue(value interface{}) {
call.WithLock(q.cond.L, func() {
q.l.PushBack(value)
q.cond.Signal()
})
}
func (q *SyncQueue) Dequeue() interface{} {
var v interface{}
call.WithLock(q.cond.L, func() {
// if queue is empty, wait enqueue
for q.l.Len() <= 0 {
q.cond.Wait()
}
v = q.l.Remove(q.l.Front())
})
return v
}
这种方法速度比上面的快,跑benchmark241 ns/op