golang:发布订阅系统

Posted IGuoSJ

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了golang:发布订阅系统相关的知识,希望对你有一定的参考价值。

发布订阅系统:

package pubsub

import (
	"sync"
	"time"
)

/*
 Author: Guo
 Date: 8/15/20 2:53 PM
 Description:
 Updated: 姓名@时间@版本 变更说明
*/

type (
	//订阅者
	Subscriber chan interface
	//主题
	TopicFunc func(v interface) bool
)

type Publisher struct 
	//消息缓存大小
	buffer int
	//消息发送超时时间
	timeout time.Duration
	//订阅者信息
	subscribers *sync.Map


//新建发布器
func NewPublisher(publishTimeout time.Duration, buffer int) *Publisher 
	return &Publisher
		buffer:      buffer,
		timeout:     publishTimeout,
		subscribers: &sync.Map,
	


//订阅某个主题
func (p *Publisher) SubscribeTopic(topic TopicFunc) Subscriber 
	ch := make(Subscriber, p.buffer)
	p.subscribers.Store(ch, topic)
	return ch


//订阅全部
func (p *Publisher) Subscribe() Subscriber 
	return p.SubscribeTopic(nil)


//退订
func (p *Publisher) ExitSub(ch Subscriber) 
	p.subscribers.Delete(ch)
	close(ch)


//关闭发送器
func (p *Publisher) Close() 
	p.subscribers.Range(func(key, value interface) bool 
		ch, ok := key.(Subscriber)
		if ok 
			close(ch)
			return true
		 else 
			return false
		
	)
	p.subscribers = nil


//按主题发送
func (p *Publisher) sendTopic(sub Subscriber, topic TopicFunc, v interface, wg *sync.WaitGroup) 
	defer wg.Done()
	if topic != nil && !topic(v) 
		return
	
	select 
	case sub <- v:
	case <-time.After(p.timeout):
	


//发布消息
func (p *Publisher) Publish(v interface) 
	var wg sync.WaitGroup
	p.subscribers.Range(func(key, value interface) bool 
		ch, ok1 := key.(Subscriber)
		topic, ok2 := value.(TopicFunc)
		if ok1 && ok2 
			wg.Add(1)
			go p.sendTopic(ch, topic, v, &wg)
			return true
		 else 
			return false
		
	)
	wg.Wait()



使用范例:

package main

import (
	"fmt"
	"guo/GoProProgram/my_test/pubsub"
	"strings"
	"time"
)
func pubSubTest() 
	var timer = time.NewTimer(time.Second * 10)
	var timerUnsubscribe = time.NewTimer(time.Second * 5)
	var stop = make(chan struct)
	timeout := time.Second * 5
	buffer := 64
	var ps = pubsub.NewPublisher(timeout, buffer)
	defer ps.Close()
	sub := ps.Subscribe()
	topic := ps.SubscribeTopic(func(v interface) bool 
		s, ok := v.(string)
		if ok 
			return strings.Contains(s, "golang")
		 else 
			return false
		
	)
	go func() 
		for ch := range sub 
			fmt.Println("Subscriber 1: ", ch)
		
	()

	go func() 
		for ch := range topic 
			fmt.Println("Subscriber 2: ", ch)
		
	()
	go func() 
		for 
			select 
			case <-time.After(time.Second * 2):
				ps.Publish("Guo")
				ps.Publish("golang")
			case <-timerUnsubscribe.C:
				ps.ExitSub(sub)
			case <-timer.C:
				stop <- struct
			
		
	()
	<-stop


func main() 
	pubSubTest()

结果输出:

//第一次
Subscriber 1:  Guo
Subscriber 1:  golang
Subscriber 2:  golang
//第二次
Subscriber 1:  Guo
Subscriber 1:  golang
Subscriber 2:  golang
//第三次,退订了第一个订阅者
Subscriber 2:  golang
//第四次
Subscriber 2:  golang
//定时器到时,主程序退出

改编自《Go语言高级编程》

以上是关于golang:发布订阅系统的主要内容,如果未能解决你的问题,请参考以下文章

golang使用rabbitmq发布/订阅

golang使用rabbitmq发布/订阅

Golang实现事件系统

Golang 连接Kafka

Google pubsub golang 订阅者在空闲几个小时后停止接收新发布的消息

Google pubsub 死字在 golang 中不起作用