10分钟完全理解golang context

Posted 文大侠666

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了10分钟完全理解golang context相关的知识,希望对你有一定的参考价值。

当前go的各种源码中应该都可以看到context的使用,Context是golang 1.7的引入的核心结构,本质是为了处理go的并发控制问题。本文主要带大家深入理解context如何使用,为什么需要context和context设计原理。

并发控制问题

先来看下并发控制到底有什么问题要解决,立马能想到什么?如下
1.多个任务并行运行起来
2.控制任务的停止
3.控制任务的超时

多任务并行执行

首先看多个并行任务如何跑起来,经典实现利用WaitGroup,如下go中多个任务跑起来很简单,func+go即可快速定义协程任务,这里利用WaitGroup控制所有任务完成后退出主程序。

// 多个任务并行控制,等待所有任务完成
func TestTaskControl(t *testing.T) 

    taskNum := 3

    wg := sync.WaitGroup
    wg.Add(taskNum)

    for i := 0; i < taskNum; i++ 
        go func(taskNo int) 
            t.Logf("Task %d run\\n", taskNo)

            wg.Done()
        (i)
    

    wg.Wait()

多任务取消/停止

那么问题来了——如何协程任务运行过程中,取消任务执行呢?在context出现之前,我们一般用两种方法,如下可以对比看

1.数据通道关闭

一般多任务执行时,我们通过channel分发任务,当检测到channel关闭时认为是收到了任务退出信号。由于channel退出是全局广播,所有下游任务都可以接到通知。如下,关闭data时协程任务会退出,简单的任务取消/停止可以使用这种方式。

func TestCancelControl(t *testing.T) 
	data := make(chan int, 10)

	go func(data chan int) 
		for 
			select 
			case val, ok := <-data:
				if !ok 
					t.Logf("Channel closed !!!")
					return
				

				t.Logf("Revice data %d\\n", val)
			
		
	(data)

	go func() 
		data <- 1
		time.Sleep(1 * time.Second)
		data <- 2

		close(data)
	()

	time.Sleep(10 * time.Second)

2.单独退出通道

和数据通道关闭类似,不同的是和传输数据不共用一个channel,对于复杂任务公用数据channel会带来复杂和不可控,不如单独引入一个退出channel专门接受退出消息,甚至可以复用这个channel做更多的任务控制动作。
如下,引入exit来执行退出监听,一旦exit channel关闭,多个协程任务都退出。
在引入context之前,主流的任务取消/停止就是这样处理,不是特别复杂的多任务控制目前很多地方也保留了这种方式。

func TestMixControl(t *testing.T) 
	data := make(chan int, 10)
	defer close(data)
	exit := make(chan struct)

	taskNum := 3

	wg := sync.WaitGroup
	wg.Add(taskNum)

	for i := 0; i < taskNum; i++ 
		go func(taskNo int, data chan int, exit chan struct) 
			defer wg.Done()

			for 
				select 
				case val, ok := <-data:
					if !ok 
						t.Logf("Task %d channel closed !!!", taskNo)
						return
					

					t.Logf("Task %d  revice data %d\\n", taskNo, val)

				case <-exit:
					t.Logf("Task %d  revice exit signal!\\n", taskNo)
					return
				
			

		(i, data, exit)
	

	go func() 
		data <- 1
		data <- 2
		data <- 3
		time.Sleep(1 * time.Second)
		data <- 4
		data <- 5
		data <- 6

		close(exit)
	()

	wg.Wait()

多任务超时控制

进一步,再思考一个问题,还是和前述逻辑一样,但是每个任务需要考虑超时,该如何实现呢?如下,和引入exit通道类似,只是引入一个超时time.After通知即可处理任务超时场景。

// 执行任务超时后退出
func TestTimeoutControl(t *testing.T) 
	data := make(chan int, 10)

	go func(data chan int) 
		for 
			select 
			case val, ok := <-data:
				if !ok 
					t.Logf("Channel closed——revice exit signal !!!")
					return
				

				t.Logf("Revice data %d\\n", val)

			case <-time.After(2 * time.Second):
				t.Log("Task time out, exit!\\n")
				return
			
		
	(data)

	go func() 
		data <- 1
		time.Sleep(3 * time.Second)
		data <- 2
	()

	time.Sleep(10 * time.Second)

那新问题来了,既然channel可以处理这些问题,那么为什么还需要引入context呢?思考这个问题:如下是多个任务执行,每个任务一个协程,现在考虑如下几个目标
1.支持多级嵌套,父任务停止后,子任务自动停止
2.控制停止顺序,先停EFG 再停BCD 最后停A

目标1还好说,目标2好像就没那么灵活了,正式讨论context如何解决这些问题前,我们先看下常规context的使用

context定义和使用

context源码结构定义如下

type Context interface 
    // 当 context 被取消或者到了 deadline,返回一个被关闭的 channel
    Done() <-chan struct

    // 在 channel Done 关闭后,返回 context 取消原因
    Err() error

    // 返回 context 是否会被取消以及自动取消时间(即 deadline)
    Deadline() (deadline time.Time, ok bool)

    // 获取 key 对应的 value
    Value(key interface) interface

使用也很简单——定义好context时指定超时控制或者取消方法,在协程任务中监听ctx.Done通道,一旦超时或者取消则响应退出即可。如下

// 1.先定义context
ctx, cancel := context.WithCancel(context.Background())  // 取消/停止控制
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)  // 取消/停止控制 + 超时控制

// 2.执行任务
go Stream(ctx, xxx)

// 3.任务中监听ctx.Done()
func Stream(ctx context.Context, out chan<- Value) error 
	for 
		// 具体任务
		v, err := DoSomething(ctx)
		if err != nil 
			return err
		
		
		// 检查完成通知
		select 
		case <-ctx.Done():
			return ctx.Err()
		case out <- v:
		
	
 

 // 4.外部控制退出
 cancel()

可以看到,这里使用context统一了之前任务停止和超时控制,
注意这里,ctx.Background 通常用在 main 函数中,作为所有 context 的根节点。
ctx.TODO 通常用在并不知道传递什么 context的情形。例如,调用一个需要传递 context 参数的函数,你手头并没有其他 context 可以传递,这时就可以传递 todo。

context多任务控制

context多任务取消/停止

先类比,之前的任务,实现如下

func TestContextCancelControl(t *testing.T) 
	data := make(chan int, 10)
	defer close(data)

	ctx, cancel := context.WithCancel(context.Background())

	taskNum := 3

	wg := sync.WaitGroup
	wg.Add(taskNum)

	for i := 0; i < taskNum; i++ 
		go func(taskNo int, data chan int, ctx context.Context) 
			defer wg.Done()

			for 
				select 
				case val, ok := <-data:
					if !ok 
						t.Logf("Task %d channel closed !!!", taskNo)
						return
					

					t.Logf("Task %d  revice data %d\\n", taskNo, val)

				case <-ctx.Done():
					t.Logf("Task %d  revice exit signal!\\n", taskNo)
					return
				
			

		(i, data, ctx)
	

	go func() 
		data <- 1
		data <- 2
		data <- 3
		time.Sleep(1 * time.Second)
		data <- 4
		data <- 5
		data <- 6

		cancel()
	()

	wg.Wait()

context多任务超时

和上述任务一个套路,只是使用WithTimeout定义context,如下

func TestContextTimeoutControl(t *testing.T) 
	data := make(chan int, 10)
	defer close(data)

	ctx, _ := context.WithTimeout(context.Background(), 3*time.Second)

	taskNum := 3

	wg := sync.WaitGroup
	wg.Add(taskNum)

	for i := 0; i < taskNum; i++ 
		go func(taskNo int, data chan int, ctx context.Context) 
			defer wg.Done()

			for 
				select 
				case val, ok := <-data:
					if !ok 
						t.Logf("Task %d channel closed !!!", taskNo)
						return
					

					t.Logf("Task %d  revice data %d\\n", taskNo, val)

				case <-ctx.Done():
					t.Logf("Task %d  revice exit signal!\\n", taskNo)
					return
				
			

		(i, data, ctx)
	

	go func() 
		data <- 1
		data <- 2
		data <- 3
		time.Sleep(1 * time.Second)
		data <- 4
		data <- 5
		data <- 6
	()

	wg.Wait()

context复杂多任务取消

这里看,我们之前提出的问题,先实现协程任务链如下

func TestContextMixCancelControl(t *testing.T) 
	ctx, cancel := context.WithCancel(context.Background())

	type FUNC func(ctx context.Context)
	runFunc := func(ctx context.Context, fname string, f FUNC) 

		t.Logf("Task %s start!\\n", fname)
		f(ctx)

		for 
			select 

			case <-ctx.Done():
				t.Logf("Task %s  revice exit signal!\\n", fname)
				return
			
		

	

	go runFunc(ctx, "A", func(ctx context.Context) 
		go runFunc(ctx, "B", func(ctx context.Context) 
			go runFunc(ctx, "C", func(ctx context.Context) 
				go runFunc(ctx, "D", func(ctx context.Context) )
			)
		)

		go runFunc(ctx, "E", func(ctx context.Context) 
			go runFunc(ctx, "F", func(ctx context.Context) 
				go runFunc(ctx, "G", func(ctx context.Context) )
			)
		)
	)

	go func() 
		time.Sleep(3 * time.Second)
		cancel()
	()

	time.Sleep(10 * time.Second)

执行,可以看到如下,任务执行是按照协程任务链顺序,但是退出是无序的,因为他们都等待同一个ctx.Done通道关系消息,响应是无序的。

context_test.go:141: Task A start!
context_test.go:141: Task E start!
context_test.go:141: Task F start!
context_test.go:141: Task G start!
context_test.go:141: Task B start!
context_test.go:141: Task C start!
context_test.go:141: Task D start!
context_test.go:148: Task A  revice exit signal!
context_test.go:148: Task D  revice exit signal!
context_test.go:148: Task F  revice exit signal!
context_test.go:148: Task E  revice exit signal!
context_test.go:148: Task C  revice exit signal!
context_test.go:148: Task B  revice exit signal!
context_test.go:148: Task G  revice exit signal!

那么,如何准确控制目标2——“控制停止顺序,先停EFG 再停BCD 最后停A”的退出执行呢,如下操作

func TestContextMixCancelControl2(t *testing.T) 
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	type FUNC func(ctx context.Context)
	runFunc := func(ctx context.Context, fname string, f FUNC) 

		t.Logf("Task %s start!\\n", fname)
		f(ctx)

		for 
			select 

			case <-ctx.Done():
				t.Logf("Task %s  revice exit signal!\\n", fname)
				return
			
		

	

	ctxb, cancelb := context.WithCancel(context.Background())
	ctxe, cancele := context.WithCancel(context.Background())

	go runFunc(ctx, "A", func(ctx context.Context) 

		go runFunc(ctxb, "B", func(ctx context.Context) 
			go runFunc(ctx, "C", func(ctx context.Context) 
				go runFunc(ctx, "D", func(ctx context.Context) )
			)
		)

		go runFunc(ctxe, "E", func(ctx context.Context) 
			go runFunc(ctx, "F", func(ctx context.Context) 
				go runFunc(ctx, "G", func(ctx context.Context) )
			)
		)
	)

	go func() 
		time.Sleep(3 * time.Second)
		cancele()
		time.Sleep(3 * time.Second)
		cancelb()
	()

	time.Sleep(10 * time.Second)

然后执行

context_test.go:184: Task A start!
context_test.go:184: Task E start!
context_test.go:184: Task F start!
context_test.go:184: Task G start!
context_test.go:184: Task B start!
context_test.go:184: Task C start!
context_test.go:184: Task D start!
context_test.go:191: Task E  revice exit signal!
context_test.go:191: Task G  revice exit signal!
context_test.go:191: Task F  revice exit signal!
context_test.go:191: Task B  revice exit signal!
context_test.go:191: Task D  revice exit signal!
context_test.go:191: Task C  revice exit signal!

可以看到,通过增加Cancel点,我们可以精准的控制任务的退出,这就是context的复杂任务控制能力。

context原理简述

所以可以看到,引入context的意义在于
1.统一的任务执行/取消/超时控制模型
2.增强的任务取消/停止控制
除此之外,context还支持传入一些简单kv,用于任务参数定义,如下,不赘述

func TestContextValueControl(t *testing.T) 

	ctx, cancel := context.WithCancel(context.WithValue(context.Background(), "testkey", "testvalue"))

	taskNum := 1
	wg := sync.WaitGroup
	wg.Add(taskNum)

	go func(ctx context.Context) 
		defer wg.Done()

		for 
			select 
			case <-ctx.Done():
				t.Logf("Task revice exit signal, ctx value:%s!\\n", ctx.Value("testkey"))
				return
			
		

	(ctx)

	go func() 
		time.Sleep(3 * time.Second)
		cancel()
	()

	wg.Wait()

其实,写到这里,对比channel实现任务和context任务控制,我们也能自然看到context的基础原理,如下

简单来说,就如下几句话
1.创建context时创建一个退出通知通道,同时维持一个协程任务的关系树,如下示意图

树的根节点是backgroud和todo节点,也就是emptyCtx

	background = new(emptyCtx)
	todo       = new(emptyCtx)

树的子节点是cancelCtx,每个子节点包括父节点指向Context和子节点map-children

type cancelCtx struct 
	Context

	mu       sync.Mutex            // protects following fields
	done     atomic.Value          // of chan struct, created lazily, closed by first cancel call
	children map[canceler]struct // set to nil by the first cancel call
	err      error                 // set to non-nil by the first cancel call

2.执行WithCancel/WithTimeout时,更新协程任务的关系树