如何使用通道在 go 例程之间传递字节片

Posted

技术标签:

【中文标题】如何使用通道在 go 例程之间传递字节片【英文标题】:How to pass byte slice between go routines using channels 【发布时间】:2020-04-24 07:09:39 【问题描述】:

我有一个从source 读取数据并将它们发送到destination 的函数。源和目标可以是任何东西,假设这个示例源是数据库(任何mysqlPostgreSQL...),目标是distributed Q(任何...ActiveMQKafka)。消息以字节存储。

这是主要功能。想法是它将旋转一个新的 go 例程并等待返回消息以供将来处理。

type Message []byte

func (p *ProcessorService) Continue(dictId int) 
    level.Info(p.logger).Log("process", "message", "dictId", dictId)
    retrieved := make(chan Message)

    go func() 
        err := p.src.Read(retrieved, strconv.Itoa(p.dictId))
        if err != nil 
            level.Error(p.logger).Log("process", "read", "message", "err", err)
        
    ()

    for r := range retrieved 
        go func(message Message) 
            level.Info(p.logger).Log("message", message)
            if len(message) > 0 
                if err := p.dst.sendToQ(message); err != nil 
                    level.Error(p.logger).Log("failed", "during", "persist", "err", err)
                
             else 
                level.Error(p.logger).Log("failed")
            
        (r)
    

这是读取函数本身

func (s *Storage) Read(out chan<- Message, opt ...string) error 

    // I just skip some basic database read operations here
    // but idea is simple, read data from the table / file row by row and 
    // 
    for _, value := range dataFromDB 
            message, err := value.row 
            if err == nil 
                out <- message
             else 
                errorf("Unable to get data %v", err)
                out <- make([]byte, 0)
            
        
    )

    close(out)

    if err != nil 
        return err
    

    return nil

如您所见,通信是通过 out chan

for r := range retrieved  
   go func(message Message) 
       // basically here message and r are pointing to the same underlying array
   

当接收到数据时var r 是一种切片字节。然后它传递给 go func(message Message) 在 go 中通过值传递的所有内容,在这种情况下,var r 将作为副本传递给匿名函数,但是它仍然有一个指向底层切片数据的指针。我很好奇在p.dst.sendToQ(message); 执行期间是否会出现问题,同时读取函数会向out channel 发送一些内容,从而导致切片数据结构被新信息覆盖。我是否应该在传递给匿名函数之前将字节片 r 复制到新的字节片中,所以底层数组会有所不同?我对其进行了测试,但并不能真正导致这种行为。不确定我是偏执狂还是不得不担心。

【问题讨论】:

【参考方案1】:

当您从数据库中获取数据时,p.dst.sendToQ(message) 中的 messagevalue.row 相同。所以,只要每个value.row 有不同的底层数组,你就应该很好。所以,我建议你检查源代码,确保它没有使用公共字节数组并不断重写。

【讨论】:

我试了两个对象,value.row 总是有不同的地址 Slice ADDRESS value.row [0xc0002b4980] , [0xc0001f6700] 但 var r 总是相同的 ADDRESS r [0xc0001f6140] [0xc0001f6140],我用的是不安全的.指针(&r) 。这是有道理的。但这仍然意味着我必须复制 r?因为即使 value.row 返回不同的切片 var r 也会被覆盖并用于网络函数 Persist r 是栈上的一个变量,所以它的地址不会改变。每个新切片都会覆盖r,然后将其作为副本传递给goroutine。下次r 获取新值时,goroutine 已经有了自己的副本。 嗯,我不确定我是否跟随,这是否意味着我必须复制?在我的理解中,goroutine 有它自己的 r 切片副本(消息),但具有相同的指向底层数组的指针。因此,如果 r 发生更改,它将更改 goroutine 中的副本,如果没有完成,可能会向 Persist 发送错误的数据? r 是一个切片,即指向数组、len 和 cap 的指针。你将r 传递给一个goroutine,goroutine 会得到一个ptr,len,cap 的副本。然后将r 分配给具有不同指针len、cap 的另一个切片。只要底层数组不同,就不需要复制。根据您之前的评论,底层数组不同,因此您不需要复制。 ~哦,明白了。因此,每次 r 都会获得一个指向一个新数组的指针,该数组具有指向底层数组的 diff 地址。但是在 go 例程中 r 的本地副本(命名消息)将有一个指向具有前一个数组的先前地址的数组的指针(已复制)。 . 因此,即使 r 更改本地副本仍然包含差异地址,因为它是一个副本.. 认为得到它

以上是关于如何使用通道在 go 例程之间传递字节片的主要内容,如果未能解决你的问题,请参考以下文章

如何正确使用通道来控制并发?

如何检查 go 通道是不是真的在等待数据?

go-通道

如何在处理结果时正确关闭 Goroutines 中的共享通道

需要同步Go例程

GO 使用由 for 循环创建的通道