为啥数据被推入通道但从未从接收器 goroutine 中读取?

Posted

技术标签:

【中文标题】为啥数据被推入通道但从未从接收器 goroutine 中读取?【英文标题】:Why is data being pushed into the channel but never read from the receiver goroutine?为什么数据被推入通道但从未从接收器 goroutine 中读取? 【发布时间】:2021-12-05 17:48:20 【问题描述】:

我正在构建一个守护进程,我有两个服务将相互发送数据。服务 A 产生数据,服务 B a 是数据缓冲区服务或类似队列。因此,从main.go 文件中,服务 B 被实例化并启动。 Start() 方法将 buffer() 函数作为 goroutine 执行,因为此函数等待数据传递到通道上,我不希望主进程停止等待 buffer 完成。然后服务 A 被实例化并启动。然后它也被“注册”到服务 B。

我为服务 A 创建了一个名为 RegisterWithBufferService 的方法,该方法创建了两个新通道。它将这些通道存储为它自己的属性,并将它们提供给服务 B。

func (s *ServiceA) RegisterWithBufferService(bufService *data.DataBuffer) error 
    newIncomingChan := make(chan *data.DataFrame, 1)
    newOutgoingChan := make(chan []byte, 1)
    s.IncomingBuffChan = newIncomingChan
    s.OutgoingDataChannels = append(s.OutgoingDataChannels, newOutgoingChan)
    bufService.DataProviders[s.ServiceName()] = data.DataProviderInfo
        IncomingChan: newOutgoingChan, //our outGoing channel is their incoming
        OutgoingChan: newIncomingChan, // our incoming channel is their outgoing
    
    s.DataBufferService = bufService
    bufService.NewProvider <- s.ServiceName() //The DataBuffer service listens for new services and creates a new goroutine for buffering
    s.Logger.Info().Msg("Registeration completed.")
    return nil

Buffer 主要监听来自服务 A 的传入数据,使用 Decode() 对其进行解码,然后将其添加到名为 buf 的切片中。如果分片的长度大于bufferPeriod,那么它将把 Outgoing 通道中分片中的第一项发送回服务 A。

func (b* DataBuffer) buffer(bufferPeriod int) 
    for 
        select 
        case newProvider := <- b.NewProvider:
            b.wg.Add(1)
            /*
            newProvider is a string
            DataProviders is a map the value it returns is a struct containing the Incoming and 
            Outgoing channels for this service
            */
            p := b.DataProviders[newProvider]
            go func(prov string, in chan []byte, out chan *DataFrame) 
                defer b.wg.Done()
                var buf []*DataFrame
                for 
                    select 
                    case rawData := <-in:
                        tmp := Decode(rawData) //custom decoding function. Returns a *DataFrame
                        buf = append(buf, tmp)
                        if len(buf) < bufferPeriod 
                            b.Logger.Info().Msg("Sending decoded data out.")
                            out <- buf[0]
                            buf = buf[1:] //pop
                        
                    case <- b.Quit:
                        return
                    
                
            (newProvider, p.IncomingChan, p.OutgoingChan)
        
    case <- b.Quit:
        return
    

现在服务 A 有一个名为 record 的方法,它会定期将数据推送到其 OutgoingDataChannels 属性中的所有通道。

func (s *ServiceA) record() error 
    ...
    if atomic.LoadInt32(&s.Listeners) != 0 
        s.Logger.Info().Msg("Sending raw data to data buffer")
        for _, outChan := range s.OutgoingDataChannels 
            outChan <- dataBytes // the receiver (Service B) is already listening and this doesn't hang
        
        s.Logger.Info().Msg("Raw data sent and received") // The logger will output this so I know it's not hanging 
    

问题是服务 A 似乎使用 record 成功推送数据,但服务 B 从未进入 buffer 子 goroutine 中的 case rawData := &lt;-in: 案例。这是因为我有嵌套的 goroutines 吗?如果不清楚,当服务 B 启动时,它会调用 buffer 但因为它会挂起,所以我调用了 buffer 一个 goroutine。因此,当服务 A 调用 RegisterWithBufferService 时,buffer goroutine 会创建一个 goroutine 来侦听来自服务 B 的新数据,并在缓冲区填满后将其推送回服务 A。我希望我解释清楚了。

编辑 1 我做了一个最小的、可重复的例子。

package main

import (
    "fmt"
    "sync"
    "sync/atomic"
    "time"
)

var (
    defaultBufferingPeriod int = 3
    DefaultPollingInterval int64 = 10
)

type DataObject struct
    Data    string


type DataProvider interface 
    RegisterWithBufferService(*DataBuffer) error
    ServiceName() string


type DataProviderInfo struct
    IncomingChan    chan *DataObject
    OutgoingChan    chan *DataObject


type DataBuffer struct
    Running         int32 //used atomically
    DataProviders   map[string]DataProviderInfo
    Quit            chan struct
    NewProvider     chan string
    wg              sync.WaitGroup


func NewDataBuffer() *DataBuffer
    var (
        wg sync.WaitGroup
    )
    return &DataBuffer
        DataProviders: make(map[string]DataProviderInfo),
        Quit: make(chan struct),
        NewProvider: make(chan string),
        wg: wg,
    


func (b *DataBuffer) Start() error 
    if ok := atomic.CompareAndSwapInt32(&b.Running, 0, 1); !ok 
        return fmt.Errorf("Could not start Data Buffer Service.")
    
    go b.buffer(defaultBufferingPeriod)
    return nil


func (b *DataBuffer) Stop() error 
    if ok := atomic.CompareAndSwapInt32(&b.Running, 1, 0); !ok 
        return fmt.Errorf("Could not stop Data Buffer Service.")
    
    for _, p := range b.DataProviders 
        close(p.IncomingChan)
        close(p.OutgoingChan)
    
    close(b.Quit)
    b.wg.Wait()
    return nil


// buffer creates goroutines for each incoming, outgoing data pair and decodes the incoming bytes into outgoing DataFrames
func (b *DataBuffer) buffer(bufferPeriod int) 
    for 
        select 
        case newProvider := <- b.NewProvider:
            fmt.Println("Received new Data provider.")
            if _, ok := b.DataProviders[newProvider]; ok  
                b.wg.Add(1)
                p := b.DataProviders[newProvider]
                go func(prov string, in chan *DataObject, out chan *DataObject) 
                    defer b.wg.Done()
                    var (
                        buf []*DataObject
                    )
                    fmt.Printf("Waiting for data from: %s\n", prov)
                    for 
                        select 
                        case inData := <-in:
                            fmt.Printf("Received data from: %s\n", prov)
                            buf = append(buf, inData)
                            if len(buf) > bufferPeriod 
                                fmt.Printf("Queue is filled, sending data back to %s\n", prov)
                                out <- buf[0]
                                fmt.Println("Data Sent")
                                buf = buf[1:] //pop
                            
                        case <- b.Quit:
                            return
                        
                    
                (newProvider, p.IncomingChan, p.OutgoingChan)
            
        case <- b.Quit:
            return
        
    


type ServiceA struct
    Active                  int32 // atomic
    Stopping                int32 // atomic
    Recording               int32 // atomic
    Listeners               int32 // atomic
    name                    string
    QuitChan                chan struct
    IncomingBuffChan        chan *DataObject
    OutgoingBuffChans       []chan *DataObject
    DataBufferService       *DataBuffer


// A compile time check to ensure ServiceA fully implements the DataProvider interface
var _ DataProvider = (*ServiceA)(nil)

func NewServiceA() (*ServiceA, error) 
    var newSliceOutChans []chan *DataObject
    return &ServiceA
        QuitChan:  make(chan struct),
        OutgoingBuffChans: newSliceOutChans,
        name:   "SERVICEA",
    , nil


// Start starts the service. Returns an error if any issues occur
func (s *ServiceA) Start() error 
    atomic.StoreInt32(&s.Active, 1)
    return nil


// Stop stops the service. Returns an error if any issues occur
func (s *ServiceA) Stop() error 
    atomic.StoreInt32(&s.Stopping, 1)
    close(s.QuitChan)
    return nil


func (s *ServiceA) StartRecording(pol_int int64) error 
    if ok := atomic.CompareAndSwapInt32(&s.Recording, 0, 1); !ok 
        return fmt.Errorf("Could not start recording. Data recording already started")
    
    ticker := time.NewTicker(time.Duration(pol_int) * time.Second)
    go func() 
        for 
            select 
            case <-ticker.C:
                fmt.Println("Time to record...")
                err := s.record()
                if err != nil 
                    return
                
            case <-s.QuitChan:
                ticker.Stop()
                return
            
        
    ()
    return nil


func (s *ServiceA) record() error 
    current_time := time.Now()
    ct := fmt.Sprintf("%02d-%02d-%d", current_time.Day(), current_time.Month(), current_time.Year())
    dataObject := &DataObject
        Data: ct,
    
    if atomic.LoadInt32(&s.Listeners) != 0 
        fmt.Println("Sending data to Data buffer...")
        for _, outChan := range s.OutgoingBuffChans 
            outChan <- dataObject // the receivers should already be listening
        
        fmt.Println("Data sent.")
    
    return nil


// RegisterWithBufferService satisfies the DataProvider interface. It provides the bufService with new incoming and outgoing channels along with a polling interval
func (s ServiceA) RegisterWithBufferService(bufService *DataBuffer) error 
    if _, ok := bufService.DataProviders[s.ServiceName()]; ok 
        return fmt.Errorf("%v data provider already registered with Data Buffer.", s.ServiceName())
    
    newIncomingChan := make(chan *DataObject, 1)
    newOutgoingChan := make(chan *DataObject, 1)
    s.IncomingBuffChan = newIncomingChan
    s.OutgoingBuffChans = append(s.OutgoingBuffChans, newOutgoingChan)
    bufService.DataProviders[s.ServiceName()] = DataProviderInfo
        IncomingChan: newOutgoingChan, //our outGoing channel is their incoming
        OutgoingChan: newIncomingChan, // our incoming channel is their outgoing
    
    s.DataBufferService = bufService
    bufService.NewProvider <- s.ServiceName() //The DataBuffer service listens for new services and creates a new goroutine for buffering
    return nil


// ServiceName satisfies the DataProvider interface. It returns the name of the service.
func (s ServiceA) ServiceName() string 
    return s.name


func main() 
    var BufferedServices []DataProvider
    fmt.Println("Instantiating and Starting Data Buffer Service...")
    bufService := NewDataBuffer()
    err := bufService.Start()
    if err != nil 
        panic(fmt.Sprintf("%v", err))
    
    defer bufService.Stop()
    fmt.Println("Data Buffer Service successfully started.")

    fmt.Println("Instantiating and Starting Service A...")
    serviceA, err := NewServiceA()
    if err != nil 
        panic(fmt.Sprintf("%v", err))
    
    BufferedServices = append(BufferedServices, *serviceA)
    err = serviceA.Start()
    if err != nil 
        panic(fmt.Sprintf("%v", err))
    
    defer serviceA.Stop()
    fmt.Println("Service A successfully started.")

    fmt.Println("Registering services with Data Buffer...")
    for _, s := range BufferedServices 
        _ = s.RegisterWithBufferService(bufService) // ignoring error msgs for base case
    
    fmt.Println("Registration complete.")

    fmt.Println("Beginning recording...")
    _ = atomic.AddInt32(&serviceA.Listeners, 1)
    err = serviceA.StartRecording(DefaultPollingInterval)
    if err != nil 
        panic(fmt.Sprintf("%v", err))
    
    for 
        select 
        case RTD := <-serviceA.IncomingBuffChan:
            fmt.Println(RTD)
        case <-serviceA.QuitChan:
            atomic.StoreInt32(&serviceA.Listeners, 0)
            bufService.Quit<-struct
        
    

在 Go 1.17 上运行。运行示例时,它应该每 10 秒打印一次:

Time to record...
Sending data to Data buffer...
Data sent.

但是数据缓冲区永远不会进入inData := &lt;-in 的情况。

【问题讨论】:

您能提供minimal , reproducible example吗?我可以在您提供的内容中看到一些潜在的竞争条件(例如bufService.DataProviders),并且您的缓冲算法看起来很可疑(为什么len(buf) &lt; bufferPeriod - 我希望这会触发每次迭代,或者永远不会触发bufferPeriod==0?)但是没有完整的例子,很难进一步评论。 当然,我会这样做并在 OP 中标记一个编辑。 @Brits 我做了一个最小的、可重复的例子。顺便说一句,您对len(buf) &lt; bufferPeriod 的看法是正确的,它应该是len(buf) &gt; bufferPeriod,因此它将切片中的第一项传递到传出通道。感谢您指出这一点。 【参考方案1】:

为了诊断这个问题,我将fmt.Println("Sending data to Data buffer...") 更改为fmt.Println("Sending data to Data buffer...", s.OutgoingBuffChans),输出为:

Time to record...
Sending data to Data buffer... []

因此,您实际上并没有将数据发送到任何渠道。这样做的原因是:

func (s ServiceA) RegisterWithBufferService(bufService *DataBuffer) error 

当您执行s.OutgoingBuffChans = append(s.OutgoingBuffChans, newOutgoingChan) 时,接收者不是指针,您正在更改s.OutgoingBuffChans 的副本中的ServiceA,该副本在函数退出时被丢弃。要修复此更改:

func (s ServiceA) RegisterWithBufferService(bufService *DataBuffer) error 

func (s *ServiceA) RegisterWithBufferService(bufService *DataBuffer) error 

BufferedServices = append(BufferedServices, *serviceA)

BufferedServices = append(BufferedServices, serviceA)

修改后的版本输出:

Time to record...
Sending data to Data buffer... [0xc0000d8060]
Data sent.
Received data from: SERVICEA
Time to record...
Sending data to Data buffer... [0xc0000d8060]
Data sent.
Received data from: SERVICEA

所以这解决了报告的问题(如果还有其他问题,我不会感到惊讶,但希望这会为您指明正确的方向)。我确实注意到您最初发布的代码确实使用了指针接收器,因此可能遇到了另一个问题(但在这种情况下很难评论代码片段)。

【讨论】:

以上是关于为啥数据被推入通道但从未从接收器 goroutine 中读取?的主要内容,如果未能解决你的问题,请参考以下文章

Go_channel

如何处理共享同一通道的多个 goroutine

单个通道上的多个接收器。谁得到数据?

Golang:为啥增加缓冲通道的大小会消除我的 goroutine 的输出?

为啥在同一个 goroutine 中使用无缓冲通道会导致死锁?

通过通道将值发送到多个 goroutine