golang 中的 pubsub 替代方案
Posted
技术标签:
【中文标题】golang 中的 pubsub 替代方案【英文标题】:pubsub alternative in golang 【发布时间】:2017-10-15 11:05:04 【问题描述】:我有一个使用 pubsub 在 javascript 中完成的简单任务,任务如下:
我有一个对象,比如说A
和另外两个对某些元素感兴趣的对象(在这种情况下是字符串),比如说Foo
对元素m, n
感兴趣,Bar
对元素n, o, p
感兴趣。兴趣可以相交。
A 对象具有添加/删除元素的方法,并且当该对象包含 Foo
感兴趣的 m, n
元素时,则该对象存储在 Foo
这是使用 pubsub 的 javascript 中的伪代码
var A = ;
var Foo =
interests: ['m', 'n'],
storedObj: ,
tempObj:
;
// Bar same as Foo with different interest ['n', 'o', 'p']
// somewhere in Foo and Bar constructor
// Foo and Bar subscribe too each interests element
// for each interests when add
subscribe('add'+interest, function(obj)
// store this obj in tempObj and increment until satisfy all
// interest
tempObj[obj]++;
// if this obj satisfy all interest then store it in array of obj
if(tempObj[obj] === len(interests))
storedObj[obj] = true;
);
// for each interests when remove
subscribe('remove'+interest, function(obj)
// remove from storedObj
delete storedObj[obj];
// decrement tempObj so it can be used for later if the interest
// is adding again
tempObj[obj]--;
);
// inside A prototype
prototype.add = function(interest)
publish('add'+interest, this);
return this;
prototype.remove = function(interest)
publish('remove'+interest, this);
return this;
// implementation
A.add('m')
.add('n')
.add('o')
// then A is stored inside Foo but not in Bar because A doesn't have
// `p`, but it still stored Bar.tempObj and have value 2 and waiting
// for `p` to be add
A.remove('m')
.add('p')
// then A is removed from Foo and stored in Bar
我想将此任务移植到 golang 中,但我不想使用 pubsub,我想要更惯用的 golang 方式。注意:我也已经在 golang 中使用过 pubsub。
你能告诉我如何在 golang 中做到这一点吗?我正在使用频道,但找不到解决方案。
【问题讨论】:
您可以使用工作队列。 nesv.github.io/golang/2014/02/25/worker-queues-in-go.html 【参考方案1】:只是给你一个想法,不一定是你的真实用例。
package main
import (
"fmt"
"time"
)
type Publisher struct
subscription map[string]chan string
func (p *Publisher)Subscribe(interest string) chan string
if p.subscription == nil
p.subscription = make(map[string]chan string)
p.subscription[interest] = make(chan string)
return p.subscription[interest]
func (p *Publisher) Add(val string)
if p.subscription[val] != nil
fmt.Println("Adding " + val)
p.subscription[val] <- "added " + val
func (p *Publisher) Remove(val string)
if p.subscription[val] != nil
p.subscription[val] <- "removed " + val
type Subscriber struct
subscriptions [] chan string
publisher *Publisher
func (s *Subscriber) RegisterInterest(interest string)
s.subscriptions = append(s.subscriptions, s.publisher.Subscribe(interest))
func (s *Subscriber) run(channel chan string)
for
fmt.Println("Waiting for message")
m := <- channel
fmt.Println("Got message : " + m)
func (s *Subscriber) Listen()
for _, elem := range s.subscriptions
go s.run(elem)
func main()
pub := Publisher
sub := &Subscriberpublisher: &pub
sub.RegisterInterest("m")
sub.RegisterInterest("n")
sub.Listen()
pub.Add("m")
pub.Add("n")
pub.Remove("m")
pub.Remove("n")
time.Sleep(time.Second * 10)
【讨论】:
以上是关于golang 中的 pubsub 替代方案的主要内容,如果未能解决你的问题,请参考以下文章
apache beam PubSub 在普通 pubsub 客户端库中读取 withIdAttribute 的替代方案