如何使用通道对 golang 管道阶段中的项目进行批处理?

Posted

技术标签:

【中文标题】如何使用通道对 golang 管道阶段中的项目进行批处理?【英文标题】:How to batch items in golang pipeline stages using channels? 【发布时间】:2018-02-02 23:24:17 【问题描述】:

我正在在线阅读管道教程并尝试构建一个像这样运行的阶段--

    将传入事件以每批 10 个为一组,然后将它们发送到输出通道 如果我们在 5 秒内没有看到 10 个事件,则合并我们收到的所有事件并发送它们,关闭 out chan 并返回。

但是,我不知道第一个选择案例会是什么样子。尝试了多种方法,但无法通过这个。 任何指针都非常感谢!

func BatchEvents(inChan <- chan *Event) <- chan *Event 
    batchSize := 10
    comboEvent := Event
    go func() 
        defer close(out)
        i = 0
        for event := range inChan 
            select 
            case -WHAT GOES HERE?-:
                if i < batchSize 
                    comboEvent.data = append(comboEvent.data, event.data)
                    i++;
                 else 
                    out <- &comboEvent
                    // reset for next batch
                    comboEvent = Event
                    i=0;
                
            case <-time.After(5 * time.Second):
                // process whatever we have seen so far if the batch size isn't filled in 5 secs
                out <- &comboEvent
                // stop after
                return
            
        
    ()
    return out

【问题讨论】:

【参考方案1】:

您的第一个选择案例应该来自该通道,而不是在通道上做一个范围,整个事情都在一个无限循环中。

func BatchEvents(inChan <-chan *Event) <-chan *Event 
    batchSize := 10
    comboEvent := Event
    go func() 
        defer close(out)
        i = 0
        for 
            select 
            case event, ok := <-inChan:
                if !ok 
                    return
                
                comboEvent.data = append(comboEvent.data, event.data)
                i++
                if i == batchSize 
                    out <- &comboEvent
                    // reset for next batch
                    comboEvent = Event
                    i = 0
                
            case <-time.After(5 * time.Second):
                // process whatever we have seen so far if the batch size isn't filled in 5 secs
                if i > 0 
                    out <- &comboEvent
                
                // stop after
                return
            
        
    ()
    return out

【讨论】:

请注意selects NOT 按优先级排序。你必须人为地产生一组优先的selects。见***.com/questions/11117382/…

以上是关于如何使用通道对 golang 管道阶段中的项目进行批处理?的主要内容,如果未能解决你的问题,请参考以下文章

Golang入门到项目实战 golang并发变成之通道channel

管道中的通道关闭异常

golang管道

Golang并发(Go程管道)

如何在mongodb聚合框架中的管道阶段后加入文档

如何在 Golang 中正确处理缓冲通道?