基于Go的websocket消息服务
Posted 健 の 随笔
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了基于Go的websocket消息服务相关的知识,希望对你有一定的参考价值。
3个月没写php了,这是我的第一个中小型go的websocket微服务。那么问题来了,github上那么多轮子,我为什么要自己造轮子呢?
Why 造轮子?
因为这样不仅能锻炼自己的技术能力,而且能帮助深入了解其中的实现原理。
直接上流程图:
其实其中有些难点并没有反映出来,比如历史消息数据的存储结构、病发时遇到的一些坑等。
历史消息的存储结构 :
即广播、组播可拆解成单播,那么代码就可以变得简单。
但是,但是,但是,有看到 "ref"? ref表示,用户的历史消息,是否是一个引用, 类似于c/cpp的指针、地址。想一想,如果广播给1w用户,那么是不是要把一个msg push到每一个用户呢?
答案至少有2:
其一:push msg给everyone,优点:读取数据时很方便, 缺点:数据大量冗余,且push一瞬间io量过大,效率低;
其二:push msg时,分别存储:广播表、组播表、单播表, 优点:分别查询性能高,无冗余 , 缺点:综合查询用户的所有历史消息时,性能差,而且redis的网络io次数较多,还有时间等排序的问题。
综合考虑,选用第1种方案。
问题又来了, 这个项目开发顺利不,遇到坑没?
废话,技术的活,哪有不带坑的!
坑1:panic中断既出 ,真tmd不是我想要的, 解决方式是:recovery ( : P
坑2:环境变量向内包的传递,试了几种办法不行,最后用一个包作代理,封装工厂和单例, 最好的解决了。
var instance *env func NewEnv()*env { env := env{} env.init() env.parameters = make(map[string]interface{}) return &env } func SingleEnv()*env{ if nil == instance { instance = NewEnv() } return instance } //...
坑3:websocket跨域问题,解决方法至少有2:可以修改默认设定
// 临时忽略websocket跨域 ws := websocket.Upgrader{ } if model.SingleConfig().Http.EnableCrossSite { ws.CheckOrigin = func(r *http.Request) bool { //mock and stub return true } }
或者是在nginx上加这些,相当于在同一个域,推荐用这:
nginx conf: upstream push { ip_hash; server 127.0.0.1:9999 ; keepalive 60; } map $http_upgrade $connection_upgrade { default upgrade; \'\' close; } server { listen 80; server_name dev.push.pub.sina.com.cn; location /push { proxy_http_version 1.1; proxy_redirect off; proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection "upgrade"; proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; client_max_body_size 10m; client_body_buffer_size 128k; proxy_connect_timeout 300; proxy_send_timeout 300; proxy_read_timeout 300; proxy_pass http://push; fastcgi_keep_conn on; include fastcgi_params; } }
坑4:go map不内建支持并发安全,这是最大的问题。解决稍有点麻烦,需要用到RWMutex锁。 我参考beego写的:
package lib import "sync" type RWLocker struct { mtx sync.RWMutex } func NewRWLock()*RWLocker{ return &RWLocker{} } func (l *RWLocker)RLockDo(callback func()){ l.mtx.RLock() defer l.mtx.RUnlock() callback() } func (l *RWLocker)RWLockDo(callback func()){ l.mtx.Lock() defer l.mtx.Unlock() callback() } type Locker struct { mtx sync.Mutex } func NewLock()*Locker{ return &Locker{} } func (l *Locker)LockDo(callback func()){ l.mtx.Lock() defer l.mtx.Unlock() callback() } type MutexMap struct{ m map[interface{}]interface{} lock *sync.RWMutex } func NewMutexMap() *MutexMap { return &MutexMap{ lock: new(sync.RWMutex), m: make(map[interface{}]interface{}), } } func (m *MutexMap) Size() int{ return len(m.m) } func (m *MutexMap) Raw() map[interface{}]interface{} { return m.m } //Get from maps return the k\'s value func (m *MutexMap) Get(k interface{}) interface{} { m.lock.RLock() defer m.lock.RUnlock() if val, ok := m.m[k]; ok { return val } return nil } // Maps the given key and value. Returns false // if the key is already in the map and changes nothing. func (m *MutexMap) Set(k interface{}, v interface{}) bool { m.lock.Lock() defer m.lock.Unlock() if val, ok := m.m[k]; !ok { m.m[k] = v } else if val != v { m.m[k] = v } else { return false } return true } // Returns true if k is exist in the map. func (m *MutexMap) Check(k interface{}) bool { m.lock.RLock() defer m.lock.RUnlock() if _, ok := m.m[k]; !ok { return false } return true } func (m *MutexMap) Keys(ignoreNil bool, keys ...interface{}) []interface{}{ m.lock.RLock() defer m.lock.RUnlock() vals := []interface{}{} for _,k := range keys { if v,ok := m.m[k]; ok { vals = append(vals, v) }else{ if !ignoreNil { vals = append(vals, nil) } } } return vals } func (m *MutexMap) Delete(k interface{}) { m.lock.Lock() defer m.lock.Unlock() delete(m.m, k) }
基本的坑就是这些了,上线部署当然是jenkins+salt+rsync:
最后,谈下,维护性、调试性。
首先维护性:目前只遇到几次go会异常崩溃的情况,一般都是不细心或并发安全没做好,这个根据日志、race tool、strace/gdb可以搞定。
另外,调试性的话,介于php, cpp之间,和java类似,一般能检查出问题,并打出日志,包括数组下标越界等,另外 还有pprof/strace/gdb等神器能用上,还是不错的。
哈哈,今天就写这么多了, 要哄妹子了-----------我闺女。
:P
以上是关于基于Go的websocket消息服务的主要内容,如果未能解决你的问题,请参考以下文章
基于go-gin框架的web服务框架之websocket(二)