go模式-生产者消费者
Posted 文大侠
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了go模式-生产者消费者相关的知识,希望对你有一定的参考价值。
常见的生产者消费者,就是多个生产者并发生产数据,多个消费者并发消费数据。在golang中很简单,生产者生产全部输入到channel中,消费者消费全部从channel中读取,天然具有并发安全性。
1.group消费
channel有如下特性
1.close时,通知各个等待goroutine是采用广播机制
2.消费时,多个消费goroutine共享一份数据,一条记录只能被一个goroutine消费到
因此消费者呈现类似kafka group消费的特性,实现如下
func Producer(msg chan int, wg *sync.WaitGroup, t *testing.T)
go func()
for i := 0; i < 10; i++
msg <- i
// 标记发送结束- 发送结束标记或关闭channel
t.Log("Send end!")
close(msg)
wg.Done()
()
func Consumer(msg chan int, wg *sync.WaitGroup, t *testing.T)
go func()
for
if m, ok := <-msg; ok // 消费时只能有一个协程获取
t.Log(m)
else //关闭时是广播机制,所有协程都会获取
t.Log("Receive end!")
break
wg.Done()
()
func TestProducerConsumer(t *testing.T)
msg := make(chan int)
wg := sync.WaitGroup
wg.Add(1)
Producer(msg, &wg, t)
wg.Add(1)
Consumer(msg, &wg, t)
wg.Add(1)
Consumer(msg, &wg, t)
wg.Wait()
2.独占消费
如果希望每个消费者都获取完整的输入消息,应该怎么实现呢?如下这里实现了类似kafka相同消费组共享一份数据,不同消费组独占一份完整的输入数据。
具体来说,就是针对不用的消费组,需要输入channel数据拷贝到每个channel独占的channel中,也就是我们常说的fan-out。
type MyProducer struct
msg chan int
sub map[int]chan int
wg *sync.WaitGroup
lock sync.RWMutex
func NewProducer() *MyProducer
return &MyProducer
make(chan int, 1),
make(map[int]chan int),
&sync.WaitGroup,
sync.RWMutex
func (p *MyProducer) Produce(data int)
p.msg <- data
func (p *MyProducer) Stop()
fmt.Println("Pump stop...")
close(p.msg)
p.wg.Wait()
func (p *MyProducer) StartPump()
fmt.Println("Pump running...")
p.wg.Add(1)
go func()
for
if m, ok := <-p.msg; ok // 消费时只能有一个协程获取
// 分发消息到下游
p.lock.RLock()
for _, v := range p.sub
v <- m
p.lock.RUnlock()
else //关闭时是广播机制,所有协程都会获取
fmt.Println("Pump Stop Done!")
// 关闭所有consumer的通道
p.lock.RLock()
for _, v := range p.sub
close(v)
p.lock.RUnlock()
break
p.wg.Done()
()
func (p *MyProducer) AddConsumer(groupid int)
var msg chan int
// 每增加一个consumer,需要增加一个用于分发消息的channel
// 同一个groupid 公用一个channel
p.lock.Lock()
if v, ok := p.sub[groupid]; ok
msg = v
else
msg = make(chan int, 1)
p.sub[groupid] = msg
p.lock.Unlock()
// 监听当前channel
p.wg.Add(1)
go func()
for
if m, ok := <-msg; ok // 消费时只能有一个协程获取
fmt.Printf("Consumer %d receive:%d!\\n", groupid, m)
else //关闭时是广播机制,所有协程都会获取
fmt.Printf("Consumer %d end!\\n", groupid)
break
p.wg.Done()
()
func TestProducerConsumer2(t *testing.T)
p := NewProducer()
p.AddConsumer(1)
p.StartPump()
p.AddConsumer(2)
p.AddConsumer(3)
p.AddConsumer(3)
p.Produce(1)
p.Produce(2)
p.Produce(3)
p.Stop()
实际上go实现的消费队列nsq(参考https://github.com/nsqio/nsq)就依赖go channel特性实现的类似kafka功能,感兴趣可以看下源码。
原创,转载请注明来自
以上是关于go模式-生产者消费者的主要内容,如果未能解决你的问题,请参考以下文章