如何处理共享同一通道的多个 goroutine

Posted

技术标签:

【中文标题】如何处理共享同一通道的多个 goroutine【英文标题】:How to handle multiple goroutines that share the same channel 【发布时间】:2021-09-27 22:16:00 【问题描述】:

我已经搜索了很多,但还没有找到我的问题的答案。

我需要多次调用外部 API,但同时使用不同的参数。 然后对于每个调用,我需要为每个数据集初始化一个结构并处理我从 API 调用接收到的数据。请记住,我读取了传入请求的每一行并立即开始将其发送到通道。

我遇到的第一个问题一开始并不明显,因为我接收的数据量很大,是每个 goroutine 并没有接收到所有通过通道的数据。 (我通过我所做的研究了解到)。所以我需要一种将数据重新排队/重定向到正确 goroutine 的方法。

从单个数据集发送流式响应的函数。 (我已经剪掉了无用的代码部分)

func (api *API) RequestData(ctx context.Context, c chan DWeatherResponse, dataset string, wg *sync.WaitGroup) error 
 for 
        line, err := reader.ReadBytes('\n')
        s := string(line)
        if err != nil 
            log.Println("End of %s", dataset)
            return err
        
    
        data, err := extractDataFromStreamLine(s, dataset)
        if err != nil 
            continue
        

        c <- *data
    

处理传入数据的函数

func (s *StrikeStruct) Process(ch, requeue chan dweather.DWeatherResponse) 
    for 
        data, more := <-ch
        if !more 
            break
        

       // data contains dataset string, value float64, date time.Time
      // The s.Parameter needs to match the dataset
        
         // IMPORTANT PART, checks if the received data is part of this struct dataset
          // If not I want to send it to another go routine until it gets to the correct 
          one. There will be a max of 4 datasets but still this could not be the best approach to have 
        if !api.GetDataset(s.Parameter, data.Dataset) 
            requeue <- data
            continue
        
        // Do stuff with the data from this point
    

现在在我自己的 API 端点上,我有以下内容:

ch := make(chan dweather.DWeatherResponse, 2)
requeue := make(chan dweather.DWeatherResponse)
final := make(chan strike.StrikePerYearResponse)

    var wg sync.WaitGroup


    for _, s := range args.Parameters.Strikes 
        strike := strike.StrikePerYear
            Parameter:       strike.Parameter(s.Dataset),
            StrikeValue: s.Value,
        

        // I receive and process the data in here
        go strike.ProcessStrikePerYear(ch, requeue, final, string(s.Dataset))
    


    go func() 
        for 
            data, _ := <-requeue
            ch <- data
        
    ()

   // Creates a goroutine for each dataset
    for _, dataset := range api.Params.Dataset 
        wg.Add(1)
        go api.RequestData(ctx, ch, dataset, &wg)
    

    wg.Wait()
    close(ch)

    //Once the data is all processed it is all appended
    var strikes []strike.StrikePerYearResponse
    for range args.Fetch.Datasets 
        strikes = append(strikes, <-final)
    

 return strikes

这段代码的问题是,一旦我开始从多个端点接收数据,requeue 就会阻塞,不会再发生任何事情。如果我删除了 requeue 逻辑数据,如果它没有落在正确的 goroutine 上,它将丢失。

我的两个问题是:

    如果有一个总是准备好接收的 goroutine,为什么 requeue 会阻塞? 我是否应该采用不同的方法来处理传入的数据?

【问题讨论】:

听起来你应该让每个 goroutine 有一个单独的通道。尝试在单个共享通道上重新排队将是困难且低效的。 将您提取的所有结果放入一个负责将数据分配给其相应数据集的例程中。或者,预先确保正确的例程执行预期的请求以处理预期的结果。无论如何,重新排队听起来不对。 好的,非常感谢。我第一次使用并发并开始以这种方式构建代码,认为所有例程都会从通道接收相同的数据。我会试试你的建议 【参考方案1】:

这不是解决问题的好方法。你应该改变你的解决方案。我建议如下实现:

import (
"fmt"
"sync"
)

// answer for https://***.com/questions/68454226/how-to-handle-multiple-goroutines-that-share-the-same-channel

var (
    finalResult = make(chan string)
)

// IData use for message dispatcher that all struct must implement its method
type IData interface 
    IsThisForMe() bool
    Process(*sync.WaitGroup)


//MainData can be your main struct like StrikePerYear
type MainData struct 
    // add any props
    Id   int
    Name string


type DataTyp1 struct 
    MainData *MainData


func (d DataTyp1) IsThisForMe() bool 
    // you can check your condition here to checking incoming data
    if d.MainData.Id == 2 
        return true
    
    return false


func (d DataTyp1) Process(wg *sync.WaitGroup) 
    d.MainData.Name = "processed by DataTyp1"
    // send result to final channel, you can change it as you want
    finalResult <- d.MainData.Name
    wg.Done()


type DataTyp2 struct 
    MainData *MainData


func (d DataTyp2) IsThisForMe() bool 
    // you can check your condition here to checking incoming data
    if d.MainData.Id == 3 
         return true
    
    return false


func (d DataTyp2) Process(wg *sync.WaitGroup) 
     d.MainData.Name = "processed by DataTyp2"
    // send result to final channel, you can change it as you want
    finalResult <- d.MainData.Name
    wg.Done()


//dispatcher will run new go routine for each request.
//you can implement a worker pool to preventing running too many go routines.
func dispatcher(incomingData *MainData, wg *sync.WaitGroup) 
     // based on your requirements you can remove this go routing or not
    go func() 
        var p IData
        p = DataTyp1incomingData
        if p.IsThisForMe() 
            go p.Process(wg)
            return
        
        p = DataTyp2incomingData
        if p.IsThisForMe() 
            go p.Process(wg)
            return
        
    ()

func main() 
    dummyDataArray := []MainData
        MainDataId: 2, Name: "this data #2",
        MainDataId: 3, Name: "this data #3",
    
    wg := sync.WaitGroup
    for i := range dummyDataArray 
        wg.Add(1)
        dispatcher(&dummyDataArray[i], &wg)
    
    result := make([]string, 0)
    done := make(chan struct)
    // data collector
    go func() 
        loop:for 
            select 
            case <-done:
                break loop
            case r := <-finalResult:
                result = append(result, r)
            
        
    ()
    wg.Wait()
    done<- struct
    for _, s := range result 
        fmt.Println(s)
    

注意:这只是为了让您敞开心扉寻找更好的解决方案,并且确定这不是生产就绪的代码。

【讨论】:

嘿,我对你的解决方案有一些分歧,如果你想看,这里是我的。 play.golang.org/p/h_yzV8UFwuv

以上是关于如何处理共享同一通道的多个 goroutine的主要内容,如果未能解决你的问题,请参考以下文章

如何处理 Django 中不共享同一个域的前端

在 monorepo 中的多个应用程序之间共享组件时如何处理共享依赖项

composer 如何处理同一个包的多个版本?

服务定位器模式 - 如何处理同一个服务接口的多个实现

Aerospike如何处理通过多个连接创建同一记录?

XCode 如何处理带有多个目标的#import 标头语句?