如何处理共享同一通道的多个 goroutine
Posted
技术标签:
【中文标题】如何处理共享同一通道的多个 goroutine【英文标题】:How to handle multiple goroutines that share the same channel 【发布时间】:2021-09-27 22:16:00 【问题描述】:我已经搜索了很多,但还没有找到我的问题的答案。
我需要多次调用外部 API,但同时使用不同的参数。 然后对于每个调用,我需要为每个数据集初始化一个结构并处理我从 API 调用接收到的数据。请记住,我读取了传入请求的每一行并立即开始将其发送到通道。
我遇到的第一个问题一开始并不明显,因为我接收的数据量很大,是每个 goroutine 并没有接收到所有通过通道的数据。 (我通过我所做的研究了解到)。所以我需要一种将数据重新排队/重定向到正确 goroutine 的方法。
从单个数据集发送流式响应的函数。 (我已经剪掉了无用的代码部分)
func (api *API) RequestData(ctx context.Context, c chan DWeatherResponse, dataset string, wg *sync.WaitGroup) error
for
line, err := reader.ReadBytes('\n')
s := string(line)
if err != nil
log.Println("End of %s", dataset)
return err
data, err := extractDataFromStreamLine(s, dataset)
if err != nil
continue
c <- *data
处理传入数据的函数
func (s *StrikeStruct) Process(ch, requeue chan dweather.DWeatherResponse)
for
data, more := <-ch
if !more
break
// data contains dataset string, value float64, date time.Time
// The s.Parameter needs to match the dataset
// IMPORTANT PART, checks if the received data is part of this struct dataset
// If not I want to send it to another go routine until it gets to the correct
one. There will be a max of 4 datasets but still this could not be the best approach to have
if !api.GetDataset(s.Parameter, data.Dataset)
requeue <- data
continue
// Do stuff with the data from this point
现在在我自己的 API 端点上,我有以下内容:
ch := make(chan dweather.DWeatherResponse, 2)
requeue := make(chan dweather.DWeatherResponse)
final := make(chan strike.StrikePerYearResponse)
var wg sync.WaitGroup
for _, s := range args.Parameters.Strikes
strike := strike.StrikePerYear
Parameter: strike.Parameter(s.Dataset),
StrikeValue: s.Value,
// I receive and process the data in here
go strike.ProcessStrikePerYear(ch, requeue, final, string(s.Dataset))
go func()
for
data, _ := <-requeue
ch <- data
()
// Creates a goroutine for each dataset
for _, dataset := range api.Params.Dataset
wg.Add(1)
go api.RequestData(ctx, ch, dataset, &wg)
wg.Wait()
close(ch)
//Once the data is all processed it is all appended
var strikes []strike.StrikePerYearResponse
for range args.Fetch.Datasets
strikes = append(strikes, <-final)
return strikes
这段代码的问题是,一旦我开始从多个端点接收数据,requeue
就会阻塞,不会再发生任何事情。如果我删除了 requeue
逻辑数据,如果它没有落在正确的 goroutine 上,它将丢失。
我的两个问题是:
-
如果有一个总是准备好接收的 goroutine,为什么 requeue 会阻塞?
我是否应该采用不同的方法来处理传入的数据?
【问题讨论】:
听起来你应该让每个 goroutine 有一个单独的通道。尝试在单个共享通道上重新排队将是困难且低效的。 将您提取的所有结果放入一个负责将数据分配给其相应数据集的例程中。或者,预先确保正确的例程执行预期的请求以处理预期的结果。无论如何,重新排队听起来不对。 好的,非常感谢。我第一次使用并发并开始以这种方式构建代码,认为所有例程都会从通道接收相同的数据。我会试试你的建议 【参考方案1】:这不是解决问题的好方法。你应该改变你的解决方案。我建议如下实现:
import (
"fmt"
"sync"
)
// answer for https://***.com/questions/68454226/how-to-handle-multiple-goroutines-that-share-the-same-channel
var (
finalResult = make(chan string)
)
// IData use for message dispatcher that all struct must implement its method
type IData interface
IsThisForMe() bool
Process(*sync.WaitGroup)
//MainData can be your main struct like StrikePerYear
type MainData struct
// add any props
Id int
Name string
type DataTyp1 struct
MainData *MainData
func (d DataTyp1) IsThisForMe() bool
// you can check your condition here to checking incoming data
if d.MainData.Id == 2
return true
return false
func (d DataTyp1) Process(wg *sync.WaitGroup)
d.MainData.Name = "processed by DataTyp1"
// send result to final channel, you can change it as you want
finalResult <- d.MainData.Name
wg.Done()
type DataTyp2 struct
MainData *MainData
func (d DataTyp2) IsThisForMe() bool
// you can check your condition here to checking incoming data
if d.MainData.Id == 3
return true
return false
func (d DataTyp2) Process(wg *sync.WaitGroup)
d.MainData.Name = "processed by DataTyp2"
// send result to final channel, you can change it as you want
finalResult <- d.MainData.Name
wg.Done()
//dispatcher will run new go routine for each request.
//you can implement a worker pool to preventing running too many go routines.
func dispatcher(incomingData *MainData, wg *sync.WaitGroup)
// based on your requirements you can remove this go routing or not
go func()
var p IData
p = DataTyp1incomingData
if p.IsThisForMe()
go p.Process(wg)
return
p = DataTyp2incomingData
if p.IsThisForMe()
go p.Process(wg)
return
()
func main()
dummyDataArray := []MainData
MainDataId: 2, Name: "this data #2",
MainDataId: 3, Name: "this data #3",
wg := sync.WaitGroup
for i := range dummyDataArray
wg.Add(1)
dispatcher(&dummyDataArray[i], &wg)
result := make([]string, 0)
done := make(chan struct)
// data collector
go func()
loop:for
select
case <-done:
break loop
case r := <-finalResult:
result = append(result, r)
()
wg.Wait()
done<- struct
for _, s := range result
fmt.Println(s)
注意:这只是为了让您敞开心扉寻找更好的解决方案,并且确定这不是生产就绪的代码。
【讨论】:
嘿,我对你的解决方案有一些分歧,如果你想看,这里是我的。 play.golang.org/p/h_yzV8UFwuv以上是关于如何处理共享同一通道的多个 goroutine的主要内容,如果未能解决你的问题,请参考以下文章