如何使用通道在 go 例程之间传递字节片
Posted
技术标签:
【中文标题】如何使用通道在 go 例程之间传递字节片【英文标题】:How to pass byte slice between go routines using channels 【发布时间】:2020-04-24 07:09:39 【问题描述】:我有一个从source
读取数据并将它们发送到destination
的函数。源和目标可以是任何东西,假设这个示例源是数据库(任何mysql
、PostgreSQL
...),目标是distributed Q
(任何...ActiveMQ
、Kafka
)。消息以字节存储。
这是主要功能。想法是它将旋转一个新的 go 例程并等待返回消息以供将来处理。
type Message []byte
func (p *ProcessorService) Continue(dictId int)
level.Info(p.logger).Log("process", "message", "dictId", dictId)
retrieved := make(chan Message)
go func()
err := p.src.Read(retrieved, strconv.Itoa(p.dictId))
if err != nil
level.Error(p.logger).Log("process", "read", "message", "err", err)
()
for r := range retrieved
go func(message Message)
level.Info(p.logger).Log("message", message)
if len(message) > 0
if err := p.dst.sendToQ(message); err != nil
level.Error(p.logger).Log("failed", "during", "persist", "err", err)
else
level.Error(p.logger).Log("failed")
(r)
这是读取函数本身
func (s *Storage) Read(out chan<- Message, opt ...string) error
// I just skip some basic database read operations here
// but idea is simple, read data from the table / file row by row and
//
for _, value := range dataFromDB
message, err := value.row
if err == nil
out <- message
else
errorf("Unable to get data %v", err)
out <- make([]byte, 0)
)
close(out)
if err != nil
return err
return nil
如您所见,通信是通过 out chan
for r := range retrieved
go func(message Message)
// basically here message and r are pointing to the same underlying array
当接收到数据时var r
是一种切片字节。然后它传递给 go func(message Message)
在 go 中通过值传递的所有内容,在这种情况下,var r
将作为副本传递给匿名函数,但是它仍然有一个指向底层切片数据的指针。我很好奇在p.dst.sendToQ(message);
执行期间是否会出现问题,同时读取函数会向out channel
发送一些内容,从而导致切片数据结构被新信息覆盖。我是否应该在传递给匿名函数之前将字节片 r
复制到新的字节片中,所以底层数组会有所不同?我对其进行了测试,但并不能真正导致这种行为。不确定我是偏执狂还是不得不担心。
【问题讨论】:
【参考方案1】:当您从数据库中获取数据时,p.dst.sendToQ(message)
中的 message
与 value.row
相同。所以,只要每个value.row
有不同的底层数组,你就应该很好。所以,我建议你检查源代码,确保它没有使用公共字节数组并不断重写。
【讨论】:
我试了两个对象,value.row 总是有不同的地址 Slice ADDRESS value.row [0xc0002b4980] , [0xc0001f6700] 但 var r 总是相同的 ADDRESS r [0xc0001f6140] [0xc0001f6140],我用的是不安全的.指针(&r) 。这是有道理的。但这仍然意味着我必须复制 r?因为即使 value.row 返回不同的切片 var r 也会被覆盖并用于网络函数 Persistr
是栈上的一个变量,所以它的地址不会改变。每个新切片都会覆盖r
,然后将其作为副本传递给goroutine。下次r
获取新值时,goroutine 已经有了自己的副本。
嗯,我不确定我是否跟随,这是否意味着我必须复制?在我的理解中,goroutine 有它自己的 r 切片副本(消息),但具有相同的指向底层数组的指针。因此,如果 r 发生更改,它将更改 goroutine 中的副本,如果没有完成,可能会向 Persist 发送错误的数据?
r
是一个切片,即指向数组、len 和 cap 的指针。你将r
传递给一个goroutine,goroutine 会得到一个ptr,len,cap 的副本。然后将r
分配给具有不同指针len、cap 的另一个切片。只要底层数组不同,就不需要复制。根据您之前的评论,底层数组不同,因此您不需要复制。
~哦,明白了。因此,每次 r 都会获得一个指向一个新数组的指针,该数组具有指向底层数组的 diff 地址。但是在 go 例程中 r 的本地副本(命名消息)将有一个指向具有前一个数组的先前地址的数组的指针(已复制)。 . 因此,即使 r 更改本地副本仍然包含差异地址,因为它是一个副本.. 认为得到它以上是关于如何使用通道在 go 例程之间传递字节片的主要内容,如果未能解决你的问题,请参考以下文章