go模式-生产者消费者

Posted 文大侠

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了go模式-生产者消费者相关的知识,希望对你有一定的参考价值。

常见的生产者消费者,就是多个生产者并发生产数据,多个消费者并发消费数据。在golang中很简单,生产者生产全部输入到channel中,消费者消费全部从channel中读取,天然具有并发安全性。

1.group消费

channel有如下特性

1.close时,通知各个等待goroutine是采用广播机制

2.消费时,多个消费goroutine共享一份数据,一条记录只能被一个goroutine消费到

因此消费者呈现类似kafka group消费的特性,实现如下

func Producer(msg chan int, wg *sync.WaitGroup, t *testing.T) {
	go func() {
		for i := 0; i < 10; i++ {
			msg <- i
		}

		// 标记发送结束- 发送结束标记或关闭channel
		t.Log("Send end!")
		close(msg)

		wg.Done()
	}()
}

func Consumer(msg chan int, wg *sync.WaitGroup, t *testing.T) {
	go func() {
		for {
			if m, ok := <-msg; ok { // 消费时只能有一个协程获取
				t.Log(m)
			} else { //关闭时是广播机制,所有协程都会获取
				t.Log("Receive end!")
				break
			}

		}

		wg.Done()
	}()
}

func TestProducerConsumer(t *testing.T) {
	msg := make(chan int)
	wg := sync.WaitGroup{}

	wg.Add(1)
	Producer(msg, &wg, t)

	wg.Add(1)
	Consumer(msg, &wg, t)
	wg.Add(1)
	Consumer(msg, &wg, t)

	wg.Wait()
}

2.独占消费

如果希望每个消费者都获取完整的输入消息,应该怎么实现呢?如下这里实现了类似kafka相同消费组共享一份数据,不同消费组独占一份完整的输入数据。

具体来说,就是针对不用的消费组,需要输入channel数据拷贝到每个channel独占的channel中,也就是我们常说的fan-out。

type MyProducer struct {
	msg  chan int
	sub  map[int]chan int
	wg   *sync.WaitGroup
	lock sync.RWMutex
}

func NewProducer() *MyProducer {
	return &MyProducer{
		make(chan int, 1),
		make(map[int]chan int),
		&sync.WaitGroup{},
		sync.RWMutex{}}
}

func (p *MyProducer) Produce(data int) {
	p.msg <- data
}

func (p *MyProducer) Stop() {
	fmt.Println("Pump stop...")
	close(p.msg)

	p.wg.Wait()
}

func (p *MyProducer) StartPump() {
	fmt.Println("Pump running...")

	p.wg.Add(1)
	go func() {

		for {

			if m, ok := <-p.msg; ok { // 消费时只能有一个协程获取
				// 分发消息到下游
				p.lock.RLock()
				for _, v := range p.sub {
					v <- m
				}
				p.lock.RUnlock()
			} else { //关闭时是广播机制,所有协程都会获取
				fmt.Println("Pump Stop Done!")

				// 关闭所有consumer的通道
				p.lock.RLock()
				for _, v := range p.sub {
					close(v)
				}
				p.lock.RUnlock()

				break
			}

		}

		p.wg.Done()
	}()
}

func (p *MyProducer) AddConsumer(groupid int) {

	var msg chan int

	// 每增加一个consumer,需要增加一个用于分发消息的channel
	// 同一个groupid 公用一个channel
	p.lock.Lock()

	if v, ok := p.sub[groupid]; ok {
		msg = v
	} else {
		msg = make(chan int, 1)
		p.sub[groupid] = msg
	}

	p.lock.Unlock()

	// 监听当前channel
	p.wg.Add(1)
	go func() {

		for {
			if m, ok := <-msg; ok { // 消费时只能有一个协程获取
				fmt.Printf("Consumer %d receive:%d!\\n", groupid, m)
			} else { //关闭时是广播机制,所有协程都会获取
				fmt.Printf("Consumer %d end!\\n", groupid)
				break
			}

		}

		p.wg.Done()
	}()
}

func TestProducerConsumer2(t *testing.T) {
	p := NewProducer()

	p.AddConsumer(1)
	p.StartPump()
	p.AddConsumer(2)
	p.AddConsumer(3)
	p.AddConsumer(3)


	p.Produce(1)
	p.Produce(2)
	p.Produce(3)


	p.Stop()
}

实际上go实现的消费队列nsq(参考https://github.com/nsqio/nsq)就依赖go channel特性实现的类似kafka功能,感兴趣可以看下源码。

 原创,转载请注明来自

以上是关于go模式-生产者消费者的主要内容,如果未能解决你的问题,请参考以下文章

go模式-生产者消费者

go模式-生产者消费者

go模式-生产者消费者

go 生产者消费者模型与发布订阅模型

云原生训练营模块一 Go语言特性

SpringCloud系列十一:SpringCloudStream(SpringCloudStream 简介创建消息生产者创建消息消费者自定义消息通道分组与持久化设置 RoutingKey)(代码片段