go 利用chan的阻塞机制,实现协程的开始阻塞返回控制器

Posted zhaosc-haha

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了go 利用chan的阻塞机制,实现协程的开始阻塞返回控制器相关的知识,希望对你有一定的参考价值。

一、使用场景

大背景是从kafka 中读取oplog进行增量处理,但是当我想发一条命令将这个增量过程阻塞,然后开始进行一次全量同步之后,在开始继续增量。

所以需要对多个协程进行控制。

二、使用知识

1. 从一个未初始化的管道读会阻塞

2.从一个关闭的管道读不会阻塞

利用两个管道和select 进行控制

三、上代码

控制器代码

package util

import (
	"errors"
	"sync"
)

const (
	//STOP 停止
	STOP = iota
	//START 开始
	START
	//PAUSE 暂停
	PAUSE
)

//Control 控制器
type Control struct {
	ch1  chan struct{}
	ch2  chan struct{}
	stat int64
	lock sync.RWMutex
}

var (
	//ErrStat 错误状态
	ErrStat = errors.New("stat error")
)

//NewControl 获得一个新Control
func NewControl() *Control {
	return &Control{
		ch1:  make(chan struct{}),
		ch2:  nil,
		stat: START,
		lock: sync.RWMutex{},
	}
}

//Stop 停止
func (c *Control) Stop() error {
	c.lock.Lock()
	defer c.lock.Unlock()
	if c.stat == START {
		c.ch2 = nil
		close(c.ch1)
		c.stat = STOP
	} else if c.stat == PAUSE {
		ch2 := c.ch2
		c.ch2 = nil
		close(c.ch1)
		close(ch2)
		c.stat = STOP
	} else {
		return ErrStat
	}
	return nil
}

//Pause 暂停
func (c *Control) Pause() error {
	c.lock.Lock()
	defer c.lock.Unlock()
	if c.stat == START {
		c.ch2 = make(chan struct{})
		close(c.ch1)
		c.stat = PAUSE
	} else {
		return ErrStat
	}
	return nil
}

//Start 开始
func (c *Control) Start() error {
	c.lock.Lock()
	defer c.lock.Unlock()
	if c.stat == PAUSE {
		c.ch1 = make(chan struct{})
		close(c.ch2)
		c.stat = START
	} else {
		return ErrStat
	}
	return nil
}

//C 控制管道
func (c *Control) C() <-chan struct{} {
	c.lock.RLock()
	defer c.lock.RUnlock()
	return c.ch1
}

//Wait 等待
func (c *Control) Wait() bool {
	c.lock.RLock()
	ch2 := c.ch2
	c.lock.RUnlock()
	if ch2 == nil {  //通过赋值nil 发送停止推出命令
		return false
	}
	<-ch2  //会进行阻塞
	return true
}

 

使用代码

	for {
		select {
		case part, ok := <-c.Partitions():
			if !ok {
				conf.Logger.Error("get kafka Partitions not ok", regular.Name)
				return
			}
			go readFromPart(c, part, regular, respChan)
		case <-regular.C():   //regular 为Control 类
			if !regular.Wait() {
				conf.Logger.Debug("Stop! ")
				return
			}
			conf.Logger.Debug("Start! ")
		}
	}

这样就可以随时随地的控制工程中的协程

regular  := util.NewControl()
regular.Pause()
regular.Start()
regular.Stop()

  

以上是关于go 利用chan的阻塞机制,实现协程的开始阻塞返回控制器的主要内容,如果未能解决你的问题,请参考以下文章

Go 语言 channel 的阻塞问题

go的有缓冲chann和无缓冲chan的区别

13 并发编程-(协程)-协程的基本概念

Kotlin 协程协程的挂起和恢复 ② ( 协程挂起 和 线程阻塞 对比 )

Kotlin 协程协程的挂起和恢复 ② ( 协程挂起 和 线程阻塞 对比 )

使用 Kotlin 协程的 Spring Boot Rest 服务