在golang中,如何编写一个为下一个阶段引入延迟的管道阶段?

Posted

技术标签:

【中文标题】在golang中,如何编写一个为下一个阶段引入延迟的管道阶段?【英文标题】:In golang, how to write a pipeline stage that introduces a delay for the next stage? 【发布时间】:2018-04-01 23:12:32 【问题描述】:

我正在关注https://blog.golang.org/pipelines 文章来实现几个阶段。

我需要一个阶段在事件传递到管道的下一个阶段之前引入几秒钟的延迟。

我对下面代码的担忧是,在传递事件之前,time.Sleep() 会产生无限数量的 go 例程。有没有更好的方法来做到这一点?

谢谢!

func fooStage(inChan <- chan *Bar) (<- chan *Bar) 
    out := make(chan *Bar, 10000)
    go func() 
        defer close(out)
        wg := sync.WaitGroup
        for 
            select 
            case event, ok := <-inChan:
                if !ok 
                    // inChan closed
                    break
                
                wg.Add(1)
                go func() 
                    time.Sleep(5 * time.Second)
                    out <- event
                    wg.Done()
                ()
            
        
        wg.Wait()
    ()
    return out

【问题讨论】:

【参考方案1】:

我已经用我的pipeline library 解决了这样的问题,就像这样:

    import "github.com/nazar256/parapipe"
    //...
    pipeline := parapipe.NewPipeline(10).
    Pipe(func(msg interface) interface 
        //some code
    ).
    Pipe(func(msg interface) interface 
        time.Sleep(3*time.Second)
        return msg
    ).
    Pipe(func(msg interface) interface 
        //some other code
    )

【讨论】:

【参考方案2】:

这是您应该用于管道应用程序的内容。上下文允许更快的拆卸。

负责管理您的in 频道必须在拆除期间关闭它。 始终关闭您的频道。

// Delay delays each `interface` coming in through `in` by `duration`.
// If the context is canceled, `in` will be flushed until it closes.
// Delay is very useful for throtteling back CPU usage in your pipelines.
func Delay(ctx context.Context, duration time.Duration, in <-chan interface) <-chan interface 
    out := make(chan interface)
    go func() 
        // Correct memory management
        defer close(out)

        // Keep reading from in until its closed
        for i := range in 
            // Take one element from in and pass it to out
            out <- i

            select 
            // Wait duration before reading from in again
            case <-time.After(duration):

            // Don't wait if the context is canceled
            case <-ctx.Done():
            
        
    ()
    return out

【讨论】:

【参考方案3】:

您可以手动修复 goroutine 的数量 - 仅从您需要的数量开始。

func sleepStage(in <-chan *Bar) (out <-chan *Bar) 
     out = make(<-chan *Bar)
     wg := sync.WaitGroup
     for i:=0; i < N; i++   // Number of goroutines in parallel
           wg.Add(1)
           go func()
                defer wg.Done()
                for e := range in 
                    time.Sleep(5*time.Seconds)
                    out <- e
                
            ()
      
      go func()
           wg.Wait()
           close(out)
       ()
       return out
  

【讨论】:

【参考方案4】:

你可以使用time.Ticker:

func fooStage(inChan <- chan *Bar) (<- chan *Bar) 
    //... some code
    ticker := time.NewTicker(5 * time.Second)
    <-ticker // the delay, probably need to call twice
    ticker.Stop()
    close(ticker.C)
    //... rest code

【讨论】:

你能解释一下它是如何与后续事件一起工作的吗?如果两个事件通过 inChan 进来,第二个事件不会等待 10 秒吗? &lt;-ticker 的返回频率不应超过指定的时间段。试试吧【参考方案5】:

您可以使用另一个通道来限制您的循环能够创建的活动 goroutine 的数量。

const numRoutines = 10

func fooStage(inChan <-chan *Bar) <-chan *Bar 
    out := make(chan *Bar, 10000)
    routines := make(chan struct, numRoutines)
    go func() 
        defer close(out)
        wg := sync.WaitGroup
        for 
            select 
            case event, ok := <-inChan:
                if !ok 
                    // inChan closed
                    break
                
                wg.Add(1)
                routines <- struct
                go func() 
                    time.Sleep(5 * time.Second)
                    out <- event
                    wg.Done()
                    <-routines
                ()
            
        
        wg.Wait()
    ()
    return out

【讨论】:

谢谢,这似乎是个好主意。我能看到的唯一缺点是如果routines频道被阻塞,事件会延迟5秒以上。我想在事件中没有时间戳的情况下没有解决这个问题的好方法。 @ultimoo 由于 5 秒的等待,您可以轻松运行数百或数千个 goroutine,这将减少实际的事件等待时间。但是,仅通过阅读代码很难确定这样的事情。需要进行测试和基准测试才能真正确定事情的实际运行方式。 当然,这更像是一个实验。我的直觉是,运行几千个这样的 goroutine 应该没问题——因为它们所做的只是运行 time.Sleep(),所以它们的大部分生命周期都不会被安排在处理器上。 您现在在代码中的唯一保证是事件将至少延迟 5 秒。如果您需要它们在特定时间范围内执行,您需要一种不同的方法 - 您还需要在生产系统上随时间对它进行基准测试,使其处于与您的应用程序一起运行的状态(或至少接近它的状态)。

以上是关于在golang中,如何编写一个为下一个阶段引入延迟的管道阶段?的主要内容,如果未能解决你的问题,请参考以下文章

编写了Java GUI的一个登录框,如何实现跳转,错误时出现提示框,登陆成功切换为下一界面

如何使用通道对 golang 管道阶段中的项目进行批处理?

golang 包结构

如何在golang中动态编写http.HandleFunc()?

『每周译Go』golang 垃圾回收器如何标记内存?

Spring Batch 为下一次插入获取自动生成的 ID