使用Go的sync.ErrGroup类快速并行搜索文件

Posted henreash

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了使用Go的sync.ErrGroup类快速并行搜索文件相关的知识,希望对你有一定的参考价值。

Go的主要特性之一是其强大的并发性能,如通道和goroutines。但是对于新手来说,goroutines是一个陌生的概念,新手在掌握并发概念过程中,经常遇到挫折。

Go团队发布的第一个帮助管理goroutines复杂性的工具是sync.WaitGroup,创建一个WaitGroup,该WaitGroup将阻塞,直到指定数量的goroutines完成执行为止。以下是文档中的一个例子:

   var wg sync.WaitGroup
    var urls = []string
            "http://www.golang.org/",
            "http://www.google.com/",
            "http://www.somestupidname.com/",
    
    for _, url := range urls 
            // Increment the WaitGroup counter.
            wg.Add(1)
            // Launch a goroutine to fetch the URL.
            go func(url string) 
                    // Decrement the counter when the goroutine completes.
                    defer wg.Done()
                    // Fetch the URL.
                    http.Get(url)
            (url)
    
    // Wait for all HTTP fetches to complete.
    wg.Wait()

WaitGroups大大简化了在Go中处理并发的过程,减少了启动goroutines时必须进行的计算。每次启动goroutine时,调用Add()来增加WaitGroup计数。完成后,调用wg.Done()。调用wg.Wait(),会阻塞直到全部任务完成。但有一个缺点,在goroutines中报错,很难找出错误是什么。

sync.WaitGroup功能扩展

最近,Go提供了一个sync.ErrGroup包。sync.ErrGroup扩展了sync.WaitGroup,添加了错误传播和在发生不可恢复的错误或超时时取消整个goroutines集的能力。下面是使用ErrGroup重写的相同示例:

var g errgroup.Group
var urls = []string
    "http://www.golang.org/",
    "http://www.google.com/",
    "http://www.somestupidname.com/",

for _, url := range urls 
    // Launch a goroutine to fetch the URL.
    url := url // https://golang.org/doc/faq#closures_and_goroutines
    g.Go(func() error 
        // Fetch the URL.
        resp, err := http.Get(url)
        if err == nil 
            resp.Body.Close()
        
        return err
    )

// Wait for all HTTP fetches to complete.
if err := g.Wait(); err == nil 
    fmt.Println("Successfully fetched all URLs.")

上面的g.Go()函数是一个包装器,传递一个匿名函数,但仍可以捕获它可能返回的错误,避免使用额外的错误处理代码。提高使用goroutines的开发效率。

为测试sync.ErrGroup的功能,下面提供了一个小程序,递归地搜索一个目录,查找具有指定模式的Go文件。这个功能可以用于在使用包的项目中,查找go源码目录树中过期或更新过的文件。为测试其他特性,还为应用程序添加了时间限制。如果达到时间限制,所有的搜索和处理goroutines将被取消,程序将退出。

在范例程序目录中运行代码,产生以下结果:

$ gogrep -timeout 1000ms . fmt                                                                                                 
gogrep.go
1 hits

如果输入错误的参数,将会提示正确的用法:

gogrep by Brian Ketelsen
Flags:
  -timeout duration
        timeout in milliseconds (default 500ms)
Usage:
    gogrep [flags] path pattern

sync.ErrGroup简化代码

下面看一下sync.ErrGroup是如何简化代码的。为易于理解编写main()函数,其他函数在main中被调用。

package main

import (
    "bytes"
    "flag"
    "fmt"
    "io/ioutil"
    "log"
    "os"
    "path/filepath"
    "strings"
    "time"

    "golang.org/x/net/context"
    "golang.org/x/sync/errgroup"
)

func main() 
    duration := flag.Duration("timeout", 500*time.Millisecond, "timeout in milliseconds")
    flag.Usage = func() 
        fmt.Printf("%s by Brian Ketelsen\\n", os.Args[0])
        fmt.Println("Usage:")
        fmt.Printf("    gogrep [flags] path pattern \\n")
        fmt.Println("Flags:")
        flag.PrintDefaults()

    
    flag.Parse()
    if flag.NArg() != 2 
        flag.Usage()
        os.Exit(-1)
    
    path := flag.Arg(0)
    pattern := flag.Arg(1)
    ctx, _ := context.WithTimeout(context.Background(), *duration)
    m, err := search(ctx, path, pattern)
    if err != nil 
        log.Fatal(err)
    
    for _, name := range m 
        fmt.Println(name)
    
    fmt.Println(len(m), "hits")

前15行配置程序期望参数及提示信息,在参数传递错误时,打印友好的提示信息。有用的代码从第16行开始:

ctx, _ := context.WithTimeout(context.Background(), *duration)

这里,创建了新的context.Context实例,指定超时时间。duration变量即为超时时长。当运行超时时,ctx和从其继承的所有上下文将在通道上收到一条消息,提醒它们超时。WithTimeout还返回一个我们不需要的cancel函数,将其赋值为" _ "忽略它。

下一行调用search()函数,传递了ctx实例、搜索路径和搜索模式参数。最后,将结果打印到终端,然后打印搜索结果的计数。

分析search() 函数

search()函数比main()稍长一些,将对它进行分别解释。

首先创建一个新的errgroup实例,其中包含了传递过来的上下文变量,并发执行后续的所有过程。

func search(ctx context.Context, root string, pattern string) ([]string, error) 
    g, ctx := errgroup.WithContext(ctx)

接下来,创建了一个通道来保存搜索到的文件。稍后把搜索候选对象发送到此通道进行进一步处理,以确定它们是否与提供的模式匹配。这个通道有一个100的缓冲区,因此可以在文件搜索goroutines完成之前开始处理goroutines。

paths := make(chan string, 100)

errgroup类型有两个方法:Wait和Go。Go方法启动子任务,Wait方法阻塞例程直至所有子任务完成。这里给Go传递一个可返回错误信息的匿名函数。

g.Go(func() error 

接下来,延迟关闭“paths”通道,以表示所有目录搜索已经完成。这样可以使用Go的“range”语句来启动多个goroutines处理候选文件。

defer close(paths)

最后,使用filepath包的Walk()函数递归遍历命令行参数中指定的目录中的所有文件。它检查文件是可读的,如果文件不是.go为后缀则不进行处理。在非Go源码文件中搜索Go源代码是没有意义的。

return filepath.Walk(root, func(path string, info os.FileInfo, err error) error 
            if err != nil 
                return err
            
            if !info.Mode().IsRegular() 
                return nil
            
            if !info.IsDir() && !strings.HasSuffix(info.Name(), ".go") 
                return nil
            

见证sync.Errgroup的威力

符合上面三个条件的文件被抛弃,它不是我们要搜索的候选对象。其他都是我们要检查的Go源文件。sync.Errgroup的威力开始显现。在两个可能的情况下使用select语句。第一种情况是将文件名发送到另一个goroutine将搜索其内容的“paths”通道。第二种情况是等待上下文超时。只要未超时,就会继续发送当前文件进行处理。当计时器过期时,上下文的Done通道将发送一条捕获到的消息,导致goroutine返回,从而停止文件搜索。

        select 
            case paths <- path:
            case <-ctx.Done():
                return ctx.Err()
            
            return nil
        )

    )

接下来,我创建了一个通道来存储与搜索模式匹配的所有文件。

c := make(chan string,100)

现在可以遍历paths通道,并搜索其中的内容。

for path := range paths 

需要指出的是:因为goroutine是一个闭包,它会在执行的时候捕获周围变量的值。因为多个goroutine并行执行,所以需要首先获取path变量的当前值,否则所有goroutine都将对path变量的值进行操作,该值可能在for循环的下一次迭代中更改。这是必须避免的。

p := path

现在对每个候选文件启动一个匿名函数。函数读取go源码文件,检查是否含有与搜索模式匹配的内容。

        g.Go(func() error 
            data, err := ioutil.ReadFile(p)
            if err != nil 
                return err
            
            if !bytes.Contains(data, []byte(pattern)) 
                return nil
            

同样,再次使用select语句在处理完成之前监视是否超时。

            select 
            case c <- p:
            case <-ctx.Done():
                return ctx.Err()
            
            return nil
        )
    

这个函数将等待errgroup上的所有goroutines完成,然后关闭结果通道,发出所有处理完成的信号并终止上面的range语句。

    go func() 
        g.Wait()
        close(c)
    ()

现在在通道中获取结果,存入切片,返回给main函数中。

    var m []string
    for r := range c 
        m = append(m, r)
    

最后,检查errgroup中的错误。如果上面的任何goroutines返回错误,都会将其它返回给main()。

    return m, g.Wait()

这个小程序远非最优。例如,在测试模式匹配的内容之前,我将源代码文件的全部内容读入内存。使用流来读取会更有效率。任意通道缓冲区大小为100意味着最多可以有100个不同的goroutine将文件读入内存。如果您有一个包含大量Go源文件的大目录,它可能会显著降低您的内存消耗。我能够在几秒钟内搜索我的整个Go源代码树来查找“fmt”包,而没有任何CPU或内存使用量的峰值,所以暂不做调整。

以上是关于使用Go的sync.ErrGroup类快速并行搜索文件的主要内容,如果未能解决你的问题,请参考以下文章

go并发小工具errgroup

go:远程包安装问题

Go语言简介

[GO]并行和并发的区别

Go语言开发Go语言并发编程

GO并发详解