golang学习随便记12

Posted sjg20010414

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了golang学习随便记12相关的知识,希望对你有一定的参考价值。

goroutine 和 channel (4)

并发的退出

golang没有提供在一个goroutine去执行终止另一个goroutine的方法。在前面的火箭发射中,通过 abort channel 实现了通知方式的 goroutine 终止,但这个办法不能实现一个通知两个或多个goroutine 退出。试图去准确计算 goroutine 数目并向 abort channel 发送相应数量的事件信号并不能天遂人愿,因为运行中可能一些 goroutine 自己退出了,从而 channel 中事件数目比 goroutine 多,发送事件一方会阻塞。另外,goroutine 又可能新生成 它的“儿子”, channel 中的事件就比goroutine数目少了,造成一些goroutine无法接收到退出消息。

我们需要一种事件消息的广播机制,通过一个 channel 把消息广播出去,而 goroutine 们都可以看到这条事件消息

实现的关键是 golang 关闭一个 channel 的行为特性:一个 channel 被关闭,它的队列中的值会被接收(一个个被消费),等到队列为空,接收过程仍然可以进行,只是接受到零值,并且该接收过程是非阻塞的 (接收 channel 值后面的代码会立刻被执行)。所以,广播机制就是:不要向 channel 发送值,而是用 关闭一个 channel 进行广播——这是唯一的让接收方不再阻塞获得值的方式。

细品以下 du 代码 (功能和 linux du 命令相同):

package main

import (
	"flag"
	"fmt"
	"io/ioutil"
	"os"
	"path/filepath"
	"sync"
	"time"
)

var verbose = flag.Bool("v", false, "show verbose progress messages")
var done = make(chan struct{})

func cancelled() bool { // 接收到 done事件,返回 true,否则总是返回 false
	select {
	case <-done: // done channel 被关闭时,就“接收”到0,这是唯一的得到值
		return true
	default:
		return false
	}
}

func main() { // 用法  du [-v]  path1  path2 ...
	flag.Parse()
	roots := flag.Args()
	if len(roots) == 0 { // 没有设定目标path,默认当前目录
		roots = []string{"."}
	}
	go func() { // 在独立 goroutine中检测用户是否按键
		os.Stdin.Read(make([]byte, 1))
		close(done) // 用户按键了,就关闭 done 这个 channel,把事件广播出去
	}()
	fileSizes := make(chan int64)
	var n sync.WaitGroup
	for _, root := range roots {
		n.Add(1)                        // 启动一个 遍历的goroutine,计数加1
		go walkDir(root, &n, fileSizes) // 每个目标path一个goroutine进行并行遍历
	}
	go func() { // 等待遍历完的过程不阻塞后续 tick 生成和发送等
		n.Wait() // 一直等到 遍历的goroutine计数 为0,关闭 fileSizes 这个 channel
		close(fileSizes)
	}()
	var tick <-chan time.Time // 无 -v, tick 保持nil
	if *verbose {             // 否则, tick 每半秒发送事件
		tick = time.Tick(500 * time.Millisecond)
	}
	var nfiles, nbytes int64
loop:
	for {
		select { // multiplex:收到的是半秒 tick,就输出一下大小,收到的是文件大小,就累加
		case <-done:
			for range fileSizes { // 清空 channel fileSizes 让已运行 goroutine结束
			}
			return
		case size, ok := <-fileSizes: // 事件1:接收 channel 中每个文件大小值
			if !ok { // fileSizes 这个channel被关闭时 ok才会 false
				break loop
			}
			nfiles++
			nbytes += size
		case <-tick: // 事件2:接收半秒tick
			//fmt.Printf("get tick, nfiles = %d, nbytes = %d\\n", nfiles, nbytes)
			printDiskUsage(nfiles, nbytes)
		}
	}
	printDiskUsage(nfiles, nbytes)
}

func printDiskUsage(nfiles, nbytes int64) {
	fmt.Printf("%d files  %.1f KB\\n", nfiles, float64(nbytes)/1e3)
}

func walkDir(dir string, n *sync.WaitGroup, fileSizes chan<- int64) {
	defer n.Done()   // 延迟执行 goroutine计数减1
	if cancelled() { // 用户是否中途取消
		return
	}
	for _, entry := range dirents(dir) { // 处理 dir下 每个文件(夹)
		if entry.IsDir() { // 是 子文件夹,在独立的goroutine递归遍历
			n.Add(1)
			subdir := filepath.Join(dir, entry.Name())
			go walkDir(subdir, n, fileSizes)
		} else {
			fileSizes <- entry.Size() // 是 文件,发送它的大小到 fileSizes
		}
	}
}

// 每个目标path一个goroutine,并且是递归的,所以,需要限制避免打开太多文件(描述符)
var sema = make(chan struct{}, 20) // 限制并发数的 semaphore(信号量),队列容量20

func dirents(dir string) []os.FileInfo { // 获得 dir目录下 所有文件(夹)
	select {
	case sema <- struct{}{}: // 发送token,表示一个目标目录已经在循环处理
	case <-done:
		return nil // 用户已经取消
	}

	defer func() { // 延迟(到处理完)释放token
		<-sema
	}()
	entries, err := ioutil.ReadDir(dir)
	if err != nil {
		fmt.Fprintf(os.Stderr, "du: %v\\n", err)
		return nil
	}
	return entries
}

以上是关于golang学习随便记12的主要内容,如果未能解决你的问题,请参考以下文章

golang学习随便记14

golang学习随便记14

golang学习随便记3

golang学习随便记9

golang学习随便记7

golang学习随便记1