如何显式清空频道?

Posted

技术标签:

【中文标题】如何显式清空频道?【英文标题】:How can I explicitly empty a channel? 【发布时间】:2014-11-26 09:29:32 【问题描述】:

短版: 有没有办法在不重新创建或循环的情况下清空 go 通道?

原因: 我正在使用两个通道来发送和接收数据,并且我有一个额外的通道来表示需要重新连接。

现在,当传输已重置/重新连接时,我想“清空”额外的通道,以确保没有任何挥之不去的其他重置请求会导致事物再次重新连接。

【问题讨论】:

periodically flushing channel in golang的可能重复 您能否更具体地了解您的用例?特别是关于您要重新连接的内容以及原因。如果没有所有细节,我觉得需要解决的真正问题是将垃圾数据放入您的频道,而不是事后将其删除。 @Floegipoky:2 个通道是通过套接字发送和接收数据,其中数据在处理协议的代码的不同部分进行解析。如果协议发现错误,我希望能够向传输层发出需要重新连接的信号。这就是我需要额外频道的原因 @dewy broto:我认为如果数据流的每一步都比下一步高一点会很好。最低的是传输,然后是链中的下一个是实际的协议,更进一步的可能是对协议起作用的东西。这样您就可以在其他位不知道的情况下交换部分(例如,删除传输并发送已知字节序列进行测试) @Toad 你的同事会感谢你分离出不相关的逻辑。 Socket IO 不属于特定协议的消息解析。 【参考方案1】:

没有循环就无法清空通道。如果你没有任何并发​​接收者,那么你可以使用这个简单的循环:

for len(ch) > 0 
  <-ch

如果确实有并发接收者,则使用循环:

L:
for 
    select 
    case <-c:
    default:
       break L
    

【讨论】:

我认为这样做的风险是,如果其他人也在阅读该频道,它可能会阻塞。 (就我而言,没有额外的阅读器,但值得注意的是)【参考方案2】:

您所描述的内容本质上是不雅的,因为可能存在重新连接频道的合法请求。我建议不要试图耗尽频道,而是跟踪时间。

在您的重新连接频道上,发布时间。完成重新连接后,记下时间。在使用重新连接通道时,丢弃所有比上次重新连接更早的消息。

实现此目的的另一个更同步的解决方案是将重新连接通道设为布尔值。发布“true”以重新连接。重新连接完成后,发布“false”。然后消费频道,直到发现“假”为止。

【讨论】:

我喜欢时间戳方法。它仍在频道中循环,但在这种情况下,您是对的,至少我不会意外删除任何较新的请求【参考方案3】:

另一种方法是使用sync.Condatomic,类似于:

type Server struct 
    s     chan int
    r     chan int
    c     *sync.Cond
    state uint32


const (
    sNormal       = 0
    sQuitting     = 1
    sReconnecting = 2
)

func New() *Server 
    s := &Server
        s: make(chan int),
        r: make(chan int),
        c: sync.NewCond(&sync.Mutex),
    
    go s.sender()
    // go s.receiver()
    return s

func (s *Server) sender() 
    //
    for 
        select 
        case data := <-s.s:
        //do stuff with data
        default:
            s.c.L.Lock()
        L:
            for 
                switch atomic.LoadUint32(&s.state) 
                case sNormal:
                    break L
                case sReconnecting:
                case sQuitting:
                    s.c.L.Unlock()
                    return
                
                s.c.Wait()
            
            s.c.L.Unlock()
        
    


//repeat for receiver

func (s *Server) Reconnect() 
    var cannotReconnect bool
    atomic.StoreUint32(&s.state, sReconnecting)
    //keep trying to reconnect
    if cannotReconnect 
        atomic.StoreUint32(&s.state, sQuitting)
     else 
        atomic.StoreUint32(&s.state, sNormal)
    
    s.c.Broadcast()

playground

【讨论】:

@Toad 是的,但你真的没有那么多选择,在处理并发时你必须使用某种锁定/原子机制¸这是我能想到的唯一方法,而不添加建议像 Floegipoky 这样的额外频道。频道也使用自己的锁,因此您不会获得太多收益。【参考方案4】:

听起来你想要一个重置 goroutine 而不是重置通道。它将具有来自发送复位信号的一侧的输入,以及到接收器的输出。当这个 goroutine 收到重新连接的请求时,它会将其传递给接收者。然后它等待在第三个通道上接收来自接收器的确认,同时丢弃它收到的任何重新连接请求。所以总共 3 个通道,1 个输入,1 个输出,1 个 ack。

【讨论】:

以上是关于如何显式清空频道?的主要内容,如果未能解决你的问题,请参考以下文章

元素操作

如何在“流水线表函数”、视图和显式游标之间进行选择

如何将空值显式插入参数化查询?

如何找到没有显式访问修饰符的方法?

如何使用链表设置显式值构造函数?

如何在chocolatey上禁用显式代理?