[Go]:使用并发性逐行读取文件

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了[Go]:使用并发性逐行读取文件相关的知识,希望对你有一定的参考价值。

我想做的事

GetLine中,我试图使用bufio.Scanner逐行解析文件并尝试并发尝试。在获取每行中的文本后,我通过string的通道将其发送给调用者(main函数)。随着价值,我也发送错误和完成标志(通过done频道)。因此,这应该能够在处理当前行的同时获取新行以在单独的goroutine中处理。

我实际上做了什么

var READCOMPLETE = errors.New("Completed Reading")

func main() {

    filename := flag.String("filename", "", "The file to parse")
    flag.Parse()

    if *filename == "" {
        log.Fatal("Provide a file to parse")
    }

    fmt.Println("Getting file")

    names := make(chan string)
    readerr := make(chan error)
    done := make(chan bool)

    go GetLine(*filename, names, readerr, done)

    for {
        select {
        case name := <-names:
            // Process each line
            fmt.Println(name)

        case err := <-readerr:
            log.Fatal(err)

        case <-done:
            // close(names)
            // close(readerr)
            break
        }
    }

    fmt.Println("Processing Complete")
}

func GetLine(filename string, names chan string, readerr chan error, done chan bool) {
    file, err := os.Open(filename)
    if err != nil {
        log.Fatal(err)
    }
    defer file.Close()

    scanner := bufio.NewScanner(file)
    for scanner.Scan() {
        names <- scanner.Text()
        //fmt.Println(scanner.Text())
    }

    if err := scanner.Err(); err != nil {
        readerr <- err
    }

    done <- true
}

我跑步了

运行时错误:fatal error: all goroutines are asleep - deadlock!

我试图修复什么?

在阅读this关于错误消息的回答之后,我尝试在names语句的最后一个句子中关闭通道readerrselect,如注释中所示。但是,程序仍然会崩溃并显示日志消息。我无法进一步解决它,并希望得到任何帮助。 欢迎学习资源。

P.S:我是GoLang的新手,还在学习如何使用Go中的CSP并发模型。事实上,这是我第一次尝试编写同步并发程序。

答案

select中的break语句会突破select。完成后,应用程序必须突破for循环。使用标签打破for循环:

loop:
    for {
        select {
        case name := <-names:
            // Process each line
            fmt.Println(name)

        case err := <-readerr:
            log.Fatal(err)

        case <-done:
            // close(names)
            // close(readerr)
            break loop
        }
    }

通过消除完成的通道可以简化代码。

func main() {

    filename := flag.String("filename", "", "The file to parse")
    flag.Parse()

    if *filename == "" {
        log.Fatal("Provide a file to parse")
    }

    fmt.Println("Getting file")

    names := make(chan string)
    readerr := make(chan error)

    go GetLine(*filename, names, readerr)

loop:
    for {
        select {
        case name := <-names:
            // Process each line
            fmt.Println(name)

        case err := <-readerr:
            if err != nil {
                log.Fatal(err)
            }
            break loop
        }
    }

    fmt.Println("Processing Complete")
}

func GetLine(filename string, names chan string, readerr chan error) {
    file, err := os.Open(filename)
    if err != nil {
        log.Fatal(err)
    }
    defer file.Close()

    scanner := bufio.NewScanner(file)
    for scanner.Scan() {
        names <- scanner.Text()
    }
    readerr <- scanner.Err()
}

在该具体示例中,可以重构代码以将接收名称与接收错误分开。

func main() {
    filename := flag.String("filename", "", "The file to parse")
    flag.Parse()

    if *filename == "" {
        log.Fatal("Provide a file to parse")
    }

    fmt.Println("Getting file")

    names := make(chan string)
    readerr := make(chan error)

    go GetLine(*filename, names, readerr)

    for name := range names {
        fmt.Println(name)
    }
    if err := <-readerr; err != nil {
        log.Fatal(err)
    }

    fmt.Println("Processing Complete")
}

func GetLine(filename string, names chan string, readerr chan error) {
    file, err := os.Open(filename)
    if err != nil {
        log.Fatal(err)
    }
    defer file.Close()

    scanner := bufio.NewScanner(file)
    for scanner.Scan() {
        names <- scanner.Text()
    }
    close(names) // close causes range on channel to break out of loop
    readerr <- scanner.Err()
}

以上是关于[Go]:使用并发性逐行读取文件的主要内容,如果未能解决你的问题,请参考以下文章

Go 入门很简单:Go 读取文本文件

[Go] 通过 17 个简短代码片段,切底弄懂 channel 基础

Jmeter - 如何通过多个线程逐行读取 CSV 数据

使用 Bash 逐行读取文件

Go基础之文件操作命令行参数序列化并发编程

Go基础之文件操作命令行参数序列化并发编程