为啥数据被推入通道但从未从接收器 goroutine 中读取?
Posted
技术标签:
【中文标题】为啥数据被推入通道但从未从接收器 goroutine 中读取?【英文标题】:Why is data being pushed into the channel but never read from the receiver goroutine?为什么数据被推入通道但从未从接收器 goroutine 中读取? 【发布时间】:2021-12-05 17:48:20 【问题描述】:我正在构建一个守护进程,我有两个服务将相互发送数据。服务 A 产生数据,服务 B a 是数据缓冲区服务或类似队列。因此,从main.go
文件中,服务 B 被实例化并启动。 Start()
方法将 buffer()
函数作为 goroutine 执行,因为此函数等待数据传递到通道上,我不希望主进程停止等待 buffer
完成。然后服务 A 被实例化并启动。然后它也被“注册”到服务 B。
我为服务 A 创建了一个名为 RegisterWithBufferService
的方法,该方法创建了两个新通道。它将这些通道存储为它自己的属性,并将它们提供给服务 B。
func (s *ServiceA) RegisterWithBufferService(bufService *data.DataBuffer) error
newIncomingChan := make(chan *data.DataFrame, 1)
newOutgoingChan := make(chan []byte, 1)
s.IncomingBuffChan = newIncomingChan
s.OutgoingDataChannels = append(s.OutgoingDataChannels, newOutgoingChan)
bufService.DataProviders[s.ServiceName()] = data.DataProviderInfo
IncomingChan: newOutgoingChan, //our outGoing channel is their incoming
OutgoingChan: newIncomingChan, // our incoming channel is their outgoing
s.DataBufferService = bufService
bufService.NewProvider <- s.ServiceName() //The DataBuffer service listens for new services and creates a new goroutine for buffering
s.Logger.Info().Msg("Registeration completed.")
return nil
Buffer 主要监听来自服务 A 的传入数据,使用 Decode()
对其进行解码,然后将其添加到名为 buf
的切片中。如果分片的长度大于bufferPeriod
,那么它将把 Outgoing 通道中分片中的第一项发送回服务 A。
func (b* DataBuffer) buffer(bufferPeriod int)
for
select
case newProvider := <- b.NewProvider:
b.wg.Add(1)
/*
newProvider is a string
DataProviders is a map the value it returns is a struct containing the Incoming and
Outgoing channels for this service
*/
p := b.DataProviders[newProvider]
go func(prov string, in chan []byte, out chan *DataFrame)
defer b.wg.Done()
var buf []*DataFrame
for
select
case rawData := <-in:
tmp := Decode(rawData) //custom decoding function. Returns a *DataFrame
buf = append(buf, tmp)
if len(buf) < bufferPeriod
b.Logger.Info().Msg("Sending decoded data out.")
out <- buf[0]
buf = buf[1:] //pop
case <- b.Quit:
return
(newProvider, p.IncomingChan, p.OutgoingChan)
case <- b.Quit:
return
现在服务 A 有一个名为 record
的方法,它会定期将数据推送到其 OutgoingDataChannels
属性中的所有通道。
func (s *ServiceA) record() error
...
if atomic.LoadInt32(&s.Listeners) != 0
s.Logger.Info().Msg("Sending raw data to data buffer")
for _, outChan := range s.OutgoingDataChannels
outChan <- dataBytes // the receiver (Service B) is already listening and this doesn't hang
s.Logger.Info().Msg("Raw data sent and received") // The logger will output this so I know it's not hanging
问题是服务 A 似乎使用 record
成功推送数据,但服务 B 从未进入 buffer
子 goroutine 中的 case rawData := <-in:
案例。这是因为我有嵌套的 goroutines 吗?如果不清楚,当服务 B 启动时,它会调用 buffer
但因为它会挂起,所以我调用了 buffer
一个 goroutine。因此,当服务 A 调用 RegisterWithBufferService
时,buffer
goroutine 会创建一个 goroutine 来侦听来自服务 B 的新数据,并在缓冲区填满后将其推送回服务 A。我希望我解释清楚了。
编辑 1 我做了一个最小的、可重复的例子。
package main
import (
"fmt"
"sync"
"sync/atomic"
"time"
)
var (
defaultBufferingPeriod int = 3
DefaultPollingInterval int64 = 10
)
type DataObject struct
Data string
type DataProvider interface
RegisterWithBufferService(*DataBuffer) error
ServiceName() string
type DataProviderInfo struct
IncomingChan chan *DataObject
OutgoingChan chan *DataObject
type DataBuffer struct
Running int32 //used atomically
DataProviders map[string]DataProviderInfo
Quit chan struct
NewProvider chan string
wg sync.WaitGroup
func NewDataBuffer() *DataBuffer
var (
wg sync.WaitGroup
)
return &DataBuffer
DataProviders: make(map[string]DataProviderInfo),
Quit: make(chan struct),
NewProvider: make(chan string),
wg: wg,
func (b *DataBuffer) Start() error
if ok := atomic.CompareAndSwapInt32(&b.Running, 0, 1); !ok
return fmt.Errorf("Could not start Data Buffer Service.")
go b.buffer(defaultBufferingPeriod)
return nil
func (b *DataBuffer) Stop() error
if ok := atomic.CompareAndSwapInt32(&b.Running, 1, 0); !ok
return fmt.Errorf("Could not stop Data Buffer Service.")
for _, p := range b.DataProviders
close(p.IncomingChan)
close(p.OutgoingChan)
close(b.Quit)
b.wg.Wait()
return nil
// buffer creates goroutines for each incoming, outgoing data pair and decodes the incoming bytes into outgoing DataFrames
func (b *DataBuffer) buffer(bufferPeriod int)
for
select
case newProvider := <- b.NewProvider:
fmt.Println("Received new Data provider.")
if _, ok := b.DataProviders[newProvider]; ok
b.wg.Add(1)
p := b.DataProviders[newProvider]
go func(prov string, in chan *DataObject, out chan *DataObject)
defer b.wg.Done()
var (
buf []*DataObject
)
fmt.Printf("Waiting for data from: %s\n", prov)
for
select
case inData := <-in:
fmt.Printf("Received data from: %s\n", prov)
buf = append(buf, inData)
if len(buf) > bufferPeriod
fmt.Printf("Queue is filled, sending data back to %s\n", prov)
out <- buf[0]
fmt.Println("Data Sent")
buf = buf[1:] //pop
case <- b.Quit:
return
(newProvider, p.IncomingChan, p.OutgoingChan)
case <- b.Quit:
return
type ServiceA struct
Active int32 // atomic
Stopping int32 // atomic
Recording int32 // atomic
Listeners int32 // atomic
name string
QuitChan chan struct
IncomingBuffChan chan *DataObject
OutgoingBuffChans []chan *DataObject
DataBufferService *DataBuffer
// A compile time check to ensure ServiceA fully implements the DataProvider interface
var _ DataProvider = (*ServiceA)(nil)
func NewServiceA() (*ServiceA, error)
var newSliceOutChans []chan *DataObject
return &ServiceA
QuitChan: make(chan struct),
OutgoingBuffChans: newSliceOutChans,
name: "SERVICEA",
, nil
// Start starts the service. Returns an error if any issues occur
func (s *ServiceA) Start() error
atomic.StoreInt32(&s.Active, 1)
return nil
// Stop stops the service. Returns an error if any issues occur
func (s *ServiceA) Stop() error
atomic.StoreInt32(&s.Stopping, 1)
close(s.QuitChan)
return nil
func (s *ServiceA) StartRecording(pol_int int64) error
if ok := atomic.CompareAndSwapInt32(&s.Recording, 0, 1); !ok
return fmt.Errorf("Could not start recording. Data recording already started")
ticker := time.NewTicker(time.Duration(pol_int) * time.Second)
go func()
for
select
case <-ticker.C:
fmt.Println("Time to record...")
err := s.record()
if err != nil
return
case <-s.QuitChan:
ticker.Stop()
return
()
return nil
func (s *ServiceA) record() error
current_time := time.Now()
ct := fmt.Sprintf("%02d-%02d-%d", current_time.Day(), current_time.Month(), current_time.Year())
dataObject := &DataObject
Data: ct,
if atomic.LoadInt32(&s.Listeners) != 0
fmt.Println("Sending data to Data buffer...")
for _, outChan := range s.OutgoingBuffChans
outChan <- dataObject // the receivers should already be listening
fmt.Println("Data sent.")
return nil
// RegisterWithBufferService satisfies the DataProvider interface. It provides the bufService with new incoming and outgoing channels along with a polling interval
func (s ServiceA) RegisterWithBufferService(bufService *DataBuffer) error
if _, ok := bufService.DataProviders[s.ServiceName()]; ok
return fmt.Errorf("%v data provider already registered with Data Buffer.", s.ServiceName())
newIncomingChan := make(chan *DataObject, 1)
newOutgoingChan := make(chan *DataObject, 1)
s.IncomingBuffChan = newIncomingChan
s.OutgoingBuffChans = append(s.OutgoingBuffChans, newOutgoingChan)
bufService.DataProviders[s.ServiceName()] = DataProviderInfo
IncomingChan: newOutgoingChan, //our outGoing channel is their incoming
OutgoingChan: newIncomingChan, // our incoming channel is their outgoing
s.DataBufferService = bufService
bufService.NewProvider <- s.ServiceName() //The DataBuffer service listens for new services and creates a new goroutine for buffering
return nil
// ServiceName satisfies the DataProvider interface. It returns the name of the service.
func (s ServiceA) ServiceName() string
return s.name
func main()
var BufferedServices []DataProvider
fmt.Println("Instantiating and Starting Data Buffer Service...")
bufService := NewDataBuffer()
err := bufService.Start()
if err != nil
panic(fmt.Sprintf("%v", err))
defer bufService.Stop()
fmt.Println("Data Buffer Service successfully started.")
fmt.Println("Instantiating and Starting Service A...")
serviceA, err := NewServiceA()
if err != nil
panic(fmt.Sprintf("%v", err))
BufferedServices = append(BufferedServices, *serviceA)
err = serviceA.Start()
if err != nil
panic(fmt.Sprintf("%v", err))
defer serviceA.Stop()
fmt.Println("Service A successfully started.")
fmt.Println("Registering services with Data Buffer...")
for _, s := range BufferedServices
_ = s.RegisterWithBufferService(bufService) // ignoring error msgs for base case
fmt.Println("Registration complete.")
fmt.Println("Beginning recording...")
_ = atomic.AddInt32(&serviceA.Listeners, 1)
err = serviceA.StartRecording(DefaultPollingInterval)
if err != nil
panic(fmt.Sprintf("%v", err))
for
select
case RTD := <-serviceA.IncomingBuffChan:
fmt.Println(RTD)
case <-serviceA.QuitChan:
atomic.StoreInt32(&serviceA.Listeners, 0)
bufService.Quit<-struct
在 Go 1.17 上运行。运行示例时,它应该每 10 秒打印一次:
Time to record...
Sending data to Data buffer...
Data sent.
但是数据缓冲区永远不会进入inData := <-in
的情况。
【问题讨论】:
您能提供minimal , reproducible example吗?我可以在您提供的内容中看到一些潜在的竞争条件(例如bufService.DataProviders
),并且您的缓冲算法看起来很可疑(为什么len(buf) < bufferPeriod
- 我希望这会触发每次迭代,或者永远不会触发bufferPeriod==0
?)但是没有完整的例子,很难进一步评论。
当然,我会这样做并在 OP 中标记一个编辑。
@Brits 我做了一个最小的、可重复的例子。顺便说一句,您对len(buf) < bufferPeriod
的看法是正确的,它应该是len(buf) > bufferPeriod
,因此它将切片中的第一项传递到传出通道。感谢您指出这一点。
【参考方案1】:
为了诊断这个问题,我将fmt.Println("Sending data to Data buffer...")
更改为fmt.Println("Sending data to Data buffer...", s.OutgoingBuffChans)
,输出为:
Time to record...
Sending data to Data buffer... []
因此,您实际上并没有将数据发送到任何渠道。这样做的原因是:
func (s ServiceA) RegisterWithBufferService(bufService *DataBuffer) error
当您执行s.OutgoingBuffChans = append(s.OutgoingBuffChans, newOutgoingChan)
时,接收者不是指针,您正在更改s.OutgoingBuffChans
的副本中的ServiceA
,该副本在函数退出时被丢弃。要修复此更改:
func (s ServiceA) RegisterWithBufferService(bufService *DataBuffer) error
到
func (s *ServiceA) RegisterWithBufferService(bufService *DataBuffer) error
和
BufferedServices = append(BufferedServices, *serviceA)
到
BufferedServices = append(BufferedServices, serviceA)
修改后的版本输出:
Time to record...
Sending data to Data buffer... [0xc0000d8060]
Data sent.
Received data from: SERVICEA
Time to record...
Sending data to Data buffer... [0xc0000d8060]
Data sent.
Received data from: SERVICEA
所以这解决了报告的问题(如果还有其他问题,我不会感到惊讶,但希望这会为您指明正确的方向)。我确实注意到您最初发布的代码确实使用了指针接收器,因此可能遇到了另一个问题(但在这种情况下很难评论代码片段)。
【讨论】:
以上是关于为啥数据被推入通道但从未从接收器 goroutine 中读取?的主要内容,如果未能解决你的问题,请参考以下文章
Golang:为啥增加缓冲通道的大小会消除我的 goroutine 的输出?