golang:发布订阅系统
Posted IGuoSJ
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了golang:发布订阅系统相关的知识,希望对你有一定的参考价值。
发布订阅系统:
package pubsub
import (
"sync"
"time"
)
/*
Author: Guo
Date: 8/15/20 2:53 PM
Description:
Updated: 姓名@时间@版本 变更说明
*/
type (
//订阅者
Subscriber chan interface
//主题
TopicFunc func(v interface) bool
)
type Publisher struct
//消息缓存大小
buffer int
//消息发送超时时间
timeout time.Duration
//订阅者信息
subscribers *sync.Map
//新建发布器
func NewPublisher(publishTimeout time.Duration, buffer int) *Publisher
return &Publisher
buffer: buffer,
timeout: publishTimeout,
subscribers: &sync.Map,
//订阅某个主题
func (p *Publisher) SubscribeTopic(topic TopicFunc) Subscriber
ch := make(Subscriber, p.buffer)
p.subscribers.Store(ch, topic)
return ch
//订阅全部
func (p *Publisher) Subscribe() Subscriber
return p.SubscribeTopic(nil)
//退订
func (p *Publisher) ExitSub(ch Subscriber)
p.subscribers.Delete(ch)
close(ch)
//关闭发送器
func (p *Publisher) Close()
p.subscribers.Range(func(key, value interface) bool
ch, ok := key.(Subscriber)
if ok
close(ch)
return true
else
return false
)
p.subscribers = nil
//按主题发送
func (p *Publisher) sendTopic(sub Subscriber, topic TopicFunc, v interface, wg *sync.WaitGroup)
defer wg.Done()
if topic != nil && !topic(v)
return
select
case sub <- v:
case <-time.After(p.timeout):
//发布消息
func (p *Publisher) Publish(v interface)
var wg sync.WaitGroup
p.subscribers.Range(func(key, value interface) bool
ch, ok1 := key.(Subscriber)
topic, ok2 := value.(TopicFunc)
if ok1 && ok2
wg.Add(1)
go p.sendTopic(ch, topic, v, &wg)
return true
else
return false
)
wg.Wait()
使用范例:
package main
import (
"fmt"
"guo/GoProProgram/my_test/pubsub"
"strings"
"time"
)
func pubSubTest()
var timer = time.NewTimer(time.Second * 10)
var timerUnsubscribe = time.NewTimer(time.Second * 5)
var stop = make(chan struct)
timeout := time.Second * 5
buffer := 64
var ps = pubsub.NewPublisher(timeout, buffer)
defer ps.Close()
sub := ps.Subscribe()
topic := ps.SubscribeTopic(func(v interface) bool
s, ok := v.(string)
if ok
return strings.Contains(s, "golang")
else
return false
)
go func()
for ch := range sub
fmt.Println("Subscriber 1: ", ch)
()
go func()
for ch := range topic
fmt.Println("Subscriber 2: ", ch)
()
go func()
for
select
case <-time.After(time.Second * 2):
ps.Publish("Guo")
ps.Publish("golang")
case <-timerUnsubscribe.C:
ps.ExitSub(sub)
case <-timer.C:
stop <- struct
()
<-stop
func main()
pubSubTest()
结果输出:
//第一次
Subscriber 1: Guo
Subscriber 1: golang
Subscriber 2: golang
//第二次
Subscriber 1: Guo
Subscriber 1: golang
Subscriber 2: golang
//第三次,退订了第一个订阅者
Subscriber 2: golang
//第四次
Subscriber 2: golang
//定时器到时,主程序退出
改编自《Go语言高级编程》
以上是关于golang:发布订阅系统的主要内容,如果未能解决你的问题,请参考以下文章