如何使用频道广播消息
Posted
技术标签:
【中文标题】如何使用频道广播消息【英文标题】:How to broadcast message using channel 【发布时间】:2016-07-24 20:28:43 【问题描述】:我是新手,我正在尝试创建一个简单的聊天服务器,客户端可以在其中向所有连接的客户端广播消息。
在我的服务器中,我有一个接受连接的 goroutine(无限循环),并且所有连接都由通道接收。
go func()
for
conn, _ := listener.Accept()
ch <- conn
()
然后,我为每个连接的客户端启动一个处理程序(goroutine)。在处理程序内部,我尝试通过迭代通道来广播到所有连接。
for c := range ch
conn.Write(msg)
但是,我无法广播,因为(我认为通过阅读文档)频道需要在迭代之前关闭。我不确定何时应该关闭频道,因为我想不断接受新的连接,而关闭频道不会让我这样做。如果有人可以帮助我,或者提供一种更好的方式向所有连接的客户端广播消息,将不胜感激。
【问题讨论】:
为什么你认为“广播”需要一个封闭的渠道?for c:= range ch
循环在 ch 关闭之前不会终止,仅此而已。
【参考方案1】:
你正在做的是一个扇出模式,也就是说,多个端点正在监听一个输入源。这种模式的结果是,只要输入源中有消息,这些侦听器中只有一个能够获取消息。唯一的例外是频道的close
。此close
将被所有听众识别,因此是“广播”。
但是你想做的是广播一条从连接中读取的消息,所以我们可以这样做:
当听众人数已知时
让每个worker监听专用的广播频道,并将消息从主频道分发到每个专用的广播频道。
type worker struct
source chan interface
quit chan struct
func (w *worker) Start()
w.source = make(chan interface, 10) // some buffer size to avoid blocking
go func()
for
select
case msg := <-w.source
// do something with msg
case <-quit: // will explain this in the last section
return
()
然后我们可以有一堆工人:
workers := []*worker&worker, &worker
for _, worker := range workers worker.Start()
然后启动我们的监听器:
go func()
for
conn, _ := listener.Accept()
ch <- conn
()
还有一个调度员:
go func()
for
msg := <- ch
for _, worker := workers
worker.source <- msg
()
当听众人数未知时
在这种情况下,上面给出的解决方案仍然有效。唯一不同的是,当你需要一个新的worker时,你需要创建一个新的worker,启动它,然后将它推入workers
slice。但是这种方法需要一个线程安全的切片,它需要一个锁。其中一种实现可能如下所示:
type threadSafeSlice struct
sync.Mutex
workers []*worker
func (slice *threadSafeSlice) Push(w *worker)
slice.Lock()
defer slice.Unlock()
workers = append(workers, w)
func (slice *threadSafeSlice) Iter(routine func(*worker))
slice.Lock()
defer slice.Unlock()
for _, worker := range workers
routine(worker)
无论何时你想启动一个工人:
w := &worker
w.Start()
threadSafeSlice.Push(w)
您的调度员将更改为:
go func()
for
msg := <- ch
threadSafeSlice.Iter(func(w *worker) w.source <- msg )
()
遗言:永远不要离开一个悬空的 goroutine
其中一个好的做法是:永远不要离开悬空的 goroutine。所以当你听完之后,你需要关闭所有你触发的 goroutine。这将通过worker
中的quit
频道完成:
首先我们需要创建一个全局的quit
信令通道:
globalQuit := make(chan struct)
每当我们创建一个worker时,我们都会将globalQuit
通道分配给它作为它的退出信号:
worker.quit = globalQuit
然后,当我们想要关闭所有工作人员时,我们只需这样做:
close(globalQuit)
由于close
会被所有监听的goroutine识别(这是你理解的一点),所有的goroutine都会被返回。记得关闭你的调度程序例程,但我会把它留给你:)
【讨论】:
谢谢,但是如果您事先不知道端点(工作人员)的数量怎么办?您将如何遍历每个 goroutine 中的工作人员? 最简单的方法是启动一个worker
,然后将其推入workers
slice,然后在调度器中的下一次for循环运行时广播。切记在进行切片推送时使用锁 :)
我从“当不知道听众数量时”示例中提出了一个要点,因为代码不完整并且有错误。这是一个简单的服务器,它广播从一个 goroutine 生成的整数,然后使用广播向每个连接发送数据:gist.github.com/speps/ce645a5ca2d2cb9a81e52c7311f38677
这段代码很活泼。想想当 select 语句中的每个 case 都准备好时会发生什么。
为什么我们需要频道ch
?为什么调度器必须在一个新的 goroutine 上运行?我们可以重写调度程序以接受msg
作为参数,通过每个w.source
通道发送。【参考方案2】:
由于 Go 通道遵循通信顺序过程 (CSP) 模式,因此通道是点对点通信实体。每次交流总是有一位作者和一位读者参与。
然而,每个频道 end 可以在多个 goroutine 之间共享。这样做是安全的 - 没有危险的竞争条件。
因此可以有多个作家共享写作端。和/或可以有多个阅读器共享阅读端。我在different answer 中写了更多关于此的内容,其中包括示例。
如果你真的需要广播,你不能直接这样做,但不难实现一个中间 goroutine,将一个值复制到一组输出通道中的每一个。
【讨论】:
【参考方案3】:广播到频道片段并使用 sync.Mutex 管理频道添加和删除可能是您情况下最简单的方法。
您可以在 golang 中对broadcast
执行以下操作:
【讨论】:
【参考方案4】:更优雅的解决方案是“代理”,客户端可以订阅和取消订阅消息。
为了优雅地处理订阅和取消订阅,我们可以利用通道,因此接收和分发消息的代理的主循环可以使用单个 select
语句合并所有这些,并且从解决方案的自然。
另一个技巧是将订阅者存储在映射中,映射来自我们用来向他们分发消息的通道。所以在地图中使用频道作为键,然后添加和删除客户端是“死”的简单。这之所以成为可能,是因为通道值为comparable,并且它们的比较非常有效,因为通道值是指向通道描述符的简单指针。
事不宜迟,下面是一个简单的代理实现:
type Broker struct
stopCh chan struct
publishCh chan interface
subCh chan chan interface
unsubCh chan chan interface
func NewBroker() *Broker
return &Broker
stopCh: make(chan struct),
publishCh: make(chan interface, 1),
subCh: make(chan chan interface, 1),
unsubCh: make(chan chan interface, 1),
func (b *Broker) Start()
subs := map[chan interface]struct
for
select
case <-b.stopCh:
return
case msgCh := <-b.subCh:
subs[msgCh] = struct
case msgCh := <-b.unsubCh:
delete(subs, msgCh)
case msg := <-b.publishCh:
for msgCh := range subs
// msgCh is buffered, use non-blocking send to protect the broker:
select
case msgCh <- msg:
default:
func (b *Broker) Stop()
close(b.stopCh)
func (b *Broker) Subscribe() chan interface
msgCh := make(chan interface, 5)
b.subCh <- msgCh
return msgCh
func (b *Broker) Unsubscribe(msgCh chan interface)
b.unsubCh <- msgCh
func (b *Broker) Publish(msg interface)
b.publishCh <- msg
使用示例:
func main()
// Create and start a broker:
b := NewBroker()
go b.Start()
// Create and subscribe 3 clients:
clientFunc := func(id int)
msgCh := b.Subscribe()
for
fmt.Printf("Client %d got message: %v\n", id, <-msgCh)
for i := 0; i < 3; i++
go clientFunc(i)
// Start publishing messages:
go func()
for msgId := 0; ; msgId++
b.Publish(fmt.Sprintf("msg#%d", msgId))
time.Sleep(300 * time.Millisecond)
()
time.Sleep(time.Second)
上面的输出将是(在Go Playground上试试):
Client 2 got message: msg#0
Client 0 got message: msg#0
Client 1 got message: msg#0
Client 2 got message: msg#1
Client 0 got message: msg#1
Client 1 got message: msg#1
Client 1 got message: msg#2
Client 2 got message: msg#2
Client 0 got message: msg#2
Client 2 got message: msg#3
Client 0 got message: msg#3
Client 1 got message: msg#3
改进
您可以考虑以下改进。这些可能有用也可能没用,具体取决于您使用代理的方式/用途。
Broker.Unsubscribe()
可能会关闭消息通道,表示不会再在其上发送消息:
func (b *Broker) Unsubscribe(msgCh chan interface)
b.unsubCh <- msgCh
close(msgCh)
这将允许客户端通过消息通道range
,如下所示:
msgCh := b.Subscribe()
for msg := range msgCh
fmt.Printf("Client %d got message: %v\n", id, msg)
如果有人像这样取消订阅msgCh
:
b.Unsubscribe(msgCh)
上述范围循环将在处理完调用Unsubscribe()
之前发送的所有消息后终止。
如果您希望您的客户端依赖于正在关闭的消息通道,并且代理的生命周期比您的应用程序的生命周期更窄,那么您也可以在代理停止时关闭所有订阅的客户端,在 Start()
这样的方法中:
case <-b.stopCh:
for msgCh := range subs
close(msgCh)
return
【讨论】:
我认为 map[chan]strinct 的使用在这里非常有趣。我以前从未见过这样使用的地图。它使获取和删除变得更加容易。【参考方案5】:这是一个较晚的答案,但我认为它可能会安抚一些好奇的读者。
Go 通道在并发方面受到广泛欢迎。
Go 社区严格遵循这句话:
不要通过共享内存进行通信;相反,通过通信共享内存。
我对此完全中立,我认为在广播方面应该考虑其他选项,而不是明确定义的channels
。
这是我的看法:来自同步包的 Cond 是 widely overlooked。 Bronze man 建议在相同的上下文中实施 braodcaster 值得注意。
我很高兴女巫 icza 建议使用频道并通过它们广播消息。我遵循相同的方法并使用同步的条件变量:
// Broadcaster is the struct which encompasses broadcasting
type Broadcaster struct
cond *sync.Cond
subscribers map[interface]func(interface)
message interface
running bool
这是我们整个广播概念所依赖的主要结构。
下面,我为这个结构定义了一些行为。简而言之,订阅者应该可以添加、删除,并且整个过程应该是可撤销的。
// SetupBroadcaster gives the broadcaster object to be used further in messaging
func SetupBroadcaster() *Broadcaster
return &Broadcaster
cond: sync.NewCond(&sync.RWMutex),
subscribers: map[interface]func(interface),
// Subscribe let others enroll in broadcast event!
func (b *Broadcaster) Subscribe(id interface, f func(input interface))
b.subscribers[id] = f
// Unsubscribe stop receiving broadcasting
func (b *Broadcaster) Unsubscribe(id interface)
b.cond.L.Lock()
delete(b.subscribers, id)
b.cond.L.Unlock()
// Publish publishes the message
func (b *Broadcaster) Publish(message interface)
go func()
b.cond.L.Lock()
b.message = message
b.cond.Broadcast()
b.cond.L.Unlock()
()
// Start the main broadcasting event
func (b *Broadcaster) Start()
b.running = true
for b.running
b.cond.L.Lock()
b.cond.Wait()
go func()
for _, f := range b.subscribers
f(b.message) // publishes the message
()
b.cond.L.Unlock()
// Stop broadcasting event
func (b *Broadcaster) Stop()
b.running = false
接下来,我可以很轻松地使用它了:
messageToaster := func(message interface)
fmt.Printf("[New Message]: %v\n", message)
unwillingReceiver := func(message interface)
fmt.Println("Do not disturb!")
broadcaster := SetupBroadcaster()
broadcaster.Subscribe(1, messageToaster)
broadcaster.Subscribe(2, messageToaster)
broadcaster.Subscribe(3, unwillingReceiver)
go broadcaster.Start()
broadcaster.Publish("Hello!")
time.Sleep(time.Second)
broadcaster.Unsubscribe(3)
broadcaster.Publish("Goodbye!")
它应该以任何顺序打印这样的东西:
[New Message]: Hello!
Do not disturb!
[New Message]: Hello!
[New Message]: Goodbye!
[New Message]: Goodbye!
在go playground看到这个
【讨论】:
【参考方案6】:另一个简单的例子: https://play.golang.org
type Broadcaster struct
mu sync.Mutex
clients map[int64]chan struct
func NewBroadcaster() *Broadcaster
return &Broadcaster
clients: make(map[int64]chan struct),
func (b *Broadcaster) Subscribe(id int64) (<-chan struct, error)
defer b.mu.Unlock()
b.mu.Lock()
s := make(chan struct, 1)
if _, ok := b.clients[id]; ok
return nil, fmt.Errorf("signal %d already exist", id)
b.clients[id] = s
return b.clients[id], nil
func (b *Broadcaster) Unsubscribe(id int64)
defer b.mu.Unlock()
b.mu.Lock()
if _, ok := b.clients[id]; ok
close(b.clients[id])
delete(b.clients, id)
func (b *Broadcaster) broadcast()
defer b.mu.Unlock()
b.mu.Lock()
for k := range b.clients
if len(b.clients[k]) == 0
b.clients[k] <- struct
type testClient struct
name string
signal <-chan struct
signalID int64
brd *Broadcaster
func (c *testClient) doWork()
i := 0
for range c.signal
fmt.Println(c.name, "do work", i)
if i > 2
c.brd.Unsubscribe(c.signalID)
fmt.Println(c.name, "unsubscribed")
i++
fmt.Println(c.name, "done")
func main()
var err error
brd := NewBroadcaster()
clients := make([]*testClient, 0)
for i := 0; i < 3; i++
c := &testClient
name: fmt.Sprint("client:", i),
signalID: time.Now().UnixNano()+int64(i), // +int64(i) for play.golang.org
brd: brd,
c.signal, err = brd.Subscribe(c.signalID)
if err != nil
log.Fatal(err)
clients = append(clients, c)
for i := 0; i < len(clients); i++
go clients[i].doWork()
for i := 0; i < 6; i++
brd.broadcast()
time.Sleep(time.Second)
输出:
client:0 do work 0
client:2 do work 0
client:1 do work 0
client:2 do work 1
client:0 do work 1
client:1 do work 1
client:2 do work 2
client:0 do work 2
client:1 do work 2
client:2 do work 3
client:2 unsubscribed
client:2 done
client:0 do work 3
client:0 unsubscribed
client:0 done
client:1 do work 3
client:1 unsubscribed
client:1 done
【讨论】:
以上是关于如何使用频道广播消息的主要内容,如果未能解决你的问题,请参考以下文章
如何从 Phoenix 控制器向 Channel 广播消息?
如何使用 Flask 和 Socket.io 加入和离开房间的简明示例?