go模式-生产者消费者

Posted 文大侠

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了go模式-生产者消费者相关的知识,希望对你有一定的参考价值。

常见的生产者消费者,就是多个生产者并发生产数据,多个消费者并发消费数据。在golang中很简单,生产者生产全部输入到channel中,消费者消费全部从channel中读取,天然具有并发安全性。

1.group消费

channel有如下特性

1.close时,通知各个等待goroutine是采用广播机制

2.消费时,多个消费goroutine共享一份数据,一条记录只能被一个goroutine消费到

因此消费者呈现类似kafka group消费的特性,实现如下

func Producer(msg chan int, wg *sync.WaitGroup, t *testing.T) 
	go func() 
		for i := 0; i < 10; i++ 
			msg <- i
		

		// 标记发送结束- 发送结束标记或关闭channel
		t.Log("Send end!")
		close(msg)

		wg.Done()
	()


func Consumer(msg chan int, wg *sync.WaitGroup, t *testing.T) 
	go func() 
		for 
			if m, ok := <-msg; ok  // 消费时只能有一个协程获取
				t.Log(m)
			 else  //关闭时是广播机制,所有协程都会获取
				t.Log("Receive end!")
				break
			

		

		wg.Done()
	()


func TestProducerConsumer(t *testing.T) 
	msg := make(chan int)
	wg := sync.WaitGroup

	wg.Add(1)
	Producer(msg, &wg, t)

	wg.Add(1)
	Consumer(msg, &wg, t)
	wg.Add(1)
	Consumer(msg, &wg, t)

	wg.Wait()

2.独占消费

如果希望每个消费者都获取完整的输入消息,应该怎么实现呢?如下这里实现了类似kafka相同消费组共享一份数据,不同消费组独占一份完整的输入数据。

具体来说,就是针对不用的消费组,需要输入channel数据拷贝到每个channel独占的channel中,也就是我们常说的fan-out。

type MyProducer struct 
	msg  chan int
	sub  map[int]chan int
	wg   *sync.WaitGroup
	lock sync.RWMutex


func NewProducer() *MyProducer 
	return &MyProducer
		make(chan int, 1),
		make(map[int]chan int),
		&sync.WaitGroup,
		sync.RWMutex


func (p *MyProducer) Produce(data int) 
	p.msg <- data


func (p *MyProducer) Stop() 
	fmt.Println("Pump stop...")
	close(p.msg)

	p.wg.Wait()


func (p *MyProducer) StartPump() 
	fmt.Println("Pump running...")

	p.wg.Add(1)
	go func() 

		for 

			if m, ok := <-p.msg; ok  // 消费时只能有一个协程获取
				// 分发消息到下游
				p.lock.RLock()
				for _, v := range p.sub 
					v <- m
				
				p.lock.RUnlock()
			 else  //关闭时是广播机制,所有协程都会获取
				fmt.Println("Pump Stop Done!")

				// 关闭所有consumer的通道
				p.lock.RLock()
				for _, v := range p.sub 
					close(v)
				
				p.lock.RUnlock()

				break
			

		

		p.wg.Done()
	()


func (p *MyProducer) AddConsumer(groupid int) 

	var msg chan int

	// 每增加一个consumer,需要增加一个用于分发消息的channel
	// 同一个groupid 公用一个channel
	p.lock.Lock()

	if v, ok := p.sub[groupid]; ok 
		msg = v
	 else 
		msg = make(chan int, 1)
		p.sub[groupid] = msg
	

	p.lock.Unlock()

	// 监听当前channel
	p.wg.Add(1)
	go func() 

		for 
			if m, ok := <-msg; ok  // 消费时只能有一个协程获取
				fmt.Printf("Consumer %d receive:%d!\\n", groupid, m)
			 else  //关闭时是广播机制,所有协程都会获取
				fmt.Printf("Consumer %d end!\\n", groupid)
				break
			

		

		p.wg.Done()
	()


func TestProducerConsumer2(t *testing.T) 
	p := NewProducer()

	p.AddConsumer(1)
	p.StartPump()
	p.AddConsumer(2)
	p.AddConsumer(3)
	p.AddConsumer(3)


	p.Produce(1)
	p.Produce(2)
	p.Produce(3)


	p.Stop()

实际上go实现的消费队列nsq(参考https://github.com/nsqio/nsq)就依赖go channel特性实现的类似kafka功能,感兴趣可以看下源码。

 原创,转载请注明来自

以上是关于go模式-生产者消费者的主要内容,如果未能解决你的问题,请参考以下文章

go模式-生产者消费者

go模式-生产者消费者

go模式-生产者消费者

go 生产者消费者模型与发布订阅模型

基于go的生产者消费者模型

redis消息队列有没有