Golang搭建即时通信系统IM-System
Posted 蔚蓝的蓝
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Golang搭建即时通信系统IM-System相关的知识,希望对你有一定的参考价值。
Golang搭建即时通信系统
1、基本功能
主要是包括用户上线,用户私聊,用户公聊,超时强踢,查询在线用户,修改用户名等基本socket通信功能。
2、简要介绍
2.1系统结构如下
主要包括两个部分:
- Client:负责客户端命令解析,请求与服务器的连接,发送消息等
- Server:监听,连接创建,主要业务逻辑的处理等。
2.2目录结构
IM-System
——client.go # 客户端相关逻辑代码
client.exe # 客户端编译的可执行文件
main.go # 服务端主程序入口
server.go # 服务器相关逻辑代码
user.go # 用户相关业务逻辑
server.exe # 服务器代码编译的可执行文件
2.3其他说明
内部对象简要说明:
Server内部创建和维护server和user对象,其中每次有客户端尝试与服务器建立连接,都会创建一个新的user对象。
server中除了一些基本的属性包含两个主要的属性,OnlineMap和message_channel,OnlineMap是一个对象的字典,保存当前与服务器建立连接之后创建的对象。channel是一个通信管道,主要将客户端发送到服务端的message发送给其他的所有用户的管道来进行广播。
user除了一些基本的属性外包含两个主要属性,Conn和message_channel,Conn为客户端与服务器建立的TCP链接,channel表示通信管道,每次channel管道中有message都会回显给客户端达到客户端与服务器通信的效果。
协程的使用:
系统会创建多个go程。
user对象内部会创建go程来阻塞监听user.message_channel,一旦有消息则会回显客户端。
server对象内部也会创建go程阻塞监听server.message_channel,一旦管道有消息则会发送给每个在线用户的管道。
server在启动之后会监听连接,一旦有新的连接,为了防止阻塞主go程,就会生成新的go程处理该连接而不影响其他连接的处理。
系统是基于读写分离模型的,因此分别使用不同的go程去接收客户端写入的字节流和往客户端写入,保证了用户在写入消息的时候同时能够接收到其他消息。
3、代码
3.1 server.go
点击查看代码
package main
import (
"fmt"
"io"
"net"
"sync"
"time"
)
type Server struct {
Ip string
Port int
//用户在线列表
OnlineMap map[string]*User
//给全局变量map加同步锁
mapLock sync.RWMutex
// 消息广播的channel
Message chan string
}
//创建一个普通的server接口(函数)
//开头大写表示对外开放
func NewServer(ip string, port int) *Server {
server := &Server{
Ip: ip,
Port: port,
OnlineMap: make(map[string]*User),
Message: make(chan string),
}
return server
}
//监听Message广播消息channel的goroutine,一旦有消息就发送给全部在线的user
func (this *Server) ListenMessage() {
//服务器端仍旧需要时刻监听
for {
msg := <-this.Message
this.mapLock.Lock()
//将消息发给所有用户
for _, u := range this.OnlineMap {
u.C <- msg
}
this.mapLock.Unlock()
}
}
//广播消息
func (this *Server) BroadCast(user *User, msg string) {
sendMessage := "[" + user.Addr + "]" + user.Name + ":" + msg
this.Message <- sendMessage
}
func (this *Server) Handler(conn net.Conn) {
//当前连接的业务
user := NewUser(conn, this)
fmt.Printf("new connection %s:%s established...\\n", user.Addr, user.Name)
//用户在线消息封装
user.Online()
//监听用户是否活跃的channel
isLive := make(chan bool)
//接受客户端发送的消息
//每个客户端都有一个go程处理客户端读的业务
go func() {
// 4K大小
buf := make([]byte, 4096)
for {
n, err := conn.Read(buf)
//fmt.Println("n=", n, ",buf=", string(buf[0:n]))
//n为成功读取到的字节数
if n == 0{
user.Offline()
return
}
if err != nil && err != io.EOF{
fmt.Println("Conn Read err:", err)
return
}
//正常获取消息
//用户输入消息是以\\n结尾的,需要去掉最后一个字节
msg := string(buf[:n-1])
//fmt.Println("handler msg: ", msg)
//接收到的消息进行广播
user.DoMessage(msg)
//用户的任意消息,代表用户是一个活跃的用户,激活管道
isLive <- true
}
}()
for {
//当前handler阻塞监听管道的消息,一旦两个管道有一个有值,就会执行select
select {
//这里一旦当isLive为True,那么就会进入select,执行完case<-isLive之后
//会接着更新After管道,但是因为还没到时间不会进入case<-After之后的语句
case <- isLive:
// 当前用户被激活,
// 这里为了重置定时器,把case<-isLive 放到了上边
//设置定时器,如果定时触发,则强踢,如果发消息,则重新激活定时器
//After本身是一个channel,如果发生超时,那么该channel中就能读取到数据
case <- time.After(time.Second * 60):
//进入case表示超时,重置定时器
//将当前的User强制关闭
//发出下线消息
user.SendMessage("you are forced offline")
//销毁管道资源
close(user.C)
//关闭用户连接
conn.Close()
//可能是因为管道资源以及conn连接关闭之后
//OnlineMap中值被回收之后,自动删除键值对???
//退出handler
return // 或者runtime.Goexit()
}
}
}
//启动服务器的接口
func (this *Server) Start() {
//socket listen
fmt.Println("server is starting...")
listener, err := net.Listen("tcp", fmt.Sprintf("%s:%d", this.Ip, this.Port))
if err != nil{
fmt.Println("listener err:", err)
return
}
//为了防止遗忘关闭连接,加上defer保证在接口结束之后close
//close listen socket
defer listener.Close()
//启动监听Message的goroutine
go this.ListenMessage()
//listener一直监听连接
for {
//accept
conn, err := listener.Accept()
if err != nil{
fmt.Println("listener accept err:", err)
continue
}
//go程处理连接
go this.Handler(conn)
}
}
3.2 user.go
点击查看代码
package main
import (
"net"
"strings"
)
type User struct {
Name string
Addr string
C chan string // channel数据为string
conn net.Conn
server *Server // 当前用户属于的服务器
}
//创建用户的接口
func NewUser(conn net.Conn, server *Server) *User {
userAddr := conn.RemoteAddr().String()
user := &User{
Name: userAddr,
Addr: userAddr,
C: make(chan string),
conn: conn,
server: server,
}
//每一个新用户都绑定一个go程监听当前用户的channel消息
go user.ListenMessage()
return user
}
//处理用户上线
func (this *User) Online() {
//用户上线,将用户加入onlinemap中
this.server.mapLock.Lock()
this.server.OnlineMap[this.Name] = this
this.server.mapLock.Unlock()
//广播用户上线的消息
this.server.BroadCast(this, "user is online")
}
//处理用户下线
func (this *User) Offline() {
//用户下线,将用户从online map删除
this.server.mapLock.Lock()
delete(this.server.OnlineMap, this.Name)
this.server.mapLock.Unlock()
//广播用户下线
this.server.BroadCast(this, "user offline")
}
//给当前user对应的客户端发送消息
func (this *User) SendMessage(msg string) {
this.conn.Write([]byte(msg))
}
//处理消息
func (this *User) DoMessage(msg string) {
if msg == "who"{
//查询当前都有哪些用户在线
this.server.mapLock.Lock()
for _, u := range this.server.OnlineMap{
onlineMsg := "[" + u.Addr + "]" + u.Name + ":" + "online...\\n"
//只发送给当前发送查询指令的用户
this.SendMessage(onlineMsg)
}
this.server.mapLock.Unlock()
} else if len(msg) > 7 && msg[:7] == "rename|" {
//新的用户名
newName := strings.Split(msg, "|")[1]
//判断新的用户名是否存在
_, ok := this.server.OnlineMap[newName]
if ok{
//服务器中已经存在该用户名
this.SendMessage(newName + "has existed...")
} else {
//修改OnlineMap
this.server.mapLock.Lock()
delete(this.server.OnlineMap, this.Name)
this.server.OnlineMap[newName] = this
this.server.mapLock.Unlock()
this.Name = newName
this.SendMessage("you have updated user name: " + this.Name + "\\n")
}
} else if len(msg) > 4 && msg[:3] == "to|" {
// 消息格式 ”to|alex|hello“
//1 获取用户名
remoteName := strings.Split(msg, "|")[1]
if remoteName == ""{
//用户名无效
this.SendMessage("invalid, user \\"to|alex|hello\\"")
return
}
//2 查询对象
remoteUser, ok := this.server.OnlineMap[remoteName]
if !ok {
//用户不存在
this.SendMessage("username not exist")
return
}
//3 获取通信消息
content := strings.Split(msg, "|")[2]
if content == ""{
this.SendMessage("invalid message")
return
}
//4 发送消息
remoteUser.SendMessage(this.Name + " say: " + content)
} else {
this.server.BroadCast(this, msg)
}
}
//监听当前User channel的方法,一旦有消息,就直接发送给对端客户端
func (this *User) ListenMessage() {
for {
msg := <-this.C
//连接写回msg,转换为二进制
//fmt.Println("user listen msg:", msg)
this.conn.Write([]byte(msg + "\\n"))
}
}
3.3 main.go
点击查看代码
package main
func main() {
server := NewServer("127.0.0.1", 8888)
server.Start()
}
3.4 client.go
点击查看代码
package main
import (
"flag"
"fmt"
"io"
"net"
"os"
)
type Client struct {
ServerIp string
ServerPort int
Name string
conn net.Conn
flag int //判断当前client的模式
}
func newClient(serverIp string, serverPort int) *Client {
client := &Client{
ServerIp: serverIp,
ServerPort: serverPort,
flag: 999, // 设置flay默认值,否则flag默认为int整型
}
//创建链接
conn, err := net.Dial("tcp", fmt.Sprintf("%s:%d", serverIp, serverPort))
if err != nil {
fmt.Println("net.Dial error:", err)
return nil
}
client.conn = conn
//返回客户端
return client
}
//client菜单栏的输出,并获取flag输入
func (client *Client) menu() bool {
var flag int
fmt.Println("input 1 into public chat")
fmt.Println("input 2 into private chat")
fmt.Println("input 3 into rename")
fmt.Println("input 0 into exit")
fmt.Scanln(&flag)
if flag >= 0 && flag <=3{
client.flag = flag
return true
} else {
fmt.Println("invalid input integer")
return false
}
}
// 监听server回应的消息,直接显示到标准输出
func (client *Client) DealResponse() {
io.Copy(os.Stdout, client.conn) // 永久阻塞监听
/*
上面一句相当于如下for循环一直从conn中读取,然后输出到终端
//for {
// buf := make([]byte, 4096)
// client.conn.Read(buf)
// fmt.Println(string(buf))
//}
*/
}
//查询在线用户
func (client *Client) QueryUsers() {
sendMsg := "who\\n" // 直接查询
_, err := client.conn.Write([]byte(sendMsg))
if err != nil{
fmt.Println("conn write err: ", err)
return
}
}
//私聊模式
func (client *Client) PrivateChat() {
var remoteName string
var chatMsg string
client.QueryUsers()
fmt.Println("please chat username, exit for stop")
fmt.Scanln(&remoteName)
for remoteName != "exit"{
fmt.Println("please input private chat content, exit for stop")
fmt.Scanln(&chatMsg)
for chatMsg != "exit"{
//发送给服务器
if len(chatMsg) != 0{
//消息不为空
sendMsg := "to|" + remoteName + "|" + chatMsg + "\\n\\n"
_, err := client.conn.Write([]byte(sendMsg))
if err != nil{
fmt.Println("conn write err:", err)
break
}
}
//写入下一条消息
chatMsg = ""
fmt.Println("please input private chat content, exit for stop")
fmt.Scanln(&chatMsg)
}
//给一个用户发送消息之后,可能还会给其他用户发送
client.QueryUsers()
fmt.Println("please chat username, exit for stop")
fmt.Scanln(&remoteName)
}
}
func (client *Client) PublicChat() {
//公聊模式
var chatMsg string
fmt.Println("please input public chat content, exit for stop")
fmt.Scanln(&chatMsg)
for chatMsg != "exit"{
//发送给服务器
if len(chatMsg) != 0{
sendMsg := chatMsg + "\\n"
_, err := client.conn.Write([]byte(sendMsg))
if err != nil{
fmt.Println("conn write err:", err)
break
}
}
//重新接受下一条消息
chatMsg = ""
fmt.Println("please input public chat content, exit for stop")
fmt.Scanln(&chatMsg)
}
}
func (client *Client) UpdateName() bool {
fmt.Println("please input username")
//接收输入的用户名
fmt.Scanln(&client.Name)
sendMsg := "rename|" + client.Name + "\\n"
//按照格式写入连接
_, err := client.conn.Write([]byte(sendMsg))
if err != nil{
fmt.Println("conn write error:", err)
return false
}
return true
}
func (client *Client) Run() {
for client.flag != 0{
for client.menu() != true {
}
//根据不同的模式处理不同的业务
switch client.flag {
case 1:
//公聊模式
fmt.Println("under public chat mode...")
client.PublicChat()
break
case 2:
//私聊模式
fmt.Println("under private chat mode...")
client.PrivateChat()
break
case 3:
// 改名
fmt.Println("under rename mode...")
client.UpdateName()
break
case 0:
//退出
fmt.Println("ready to exit")
break
}
}
}
//尝试从终端命令行解析IP和Port创建客户端
var serverIp string
var serverPort int
//文件的初始化函数
//命令的格式 ./client.exe -ip 127.0.0.1 -port 8888
func init() {
//属于初始化工作,一般放在init中
flag.StringVar(&serverIp, "ip", "127.0.0.1", "set server ip(default:127.0.0.1)")
flag.IntVar(&serverPort, "port", 8888, "set server port(default:8888)")
}
func main() {
//通过命令行解析
flag.Parse()
client := newClient(serverIp, serverPort)
//client := newClient("127.0.0.1", 8888)
if client == nil{
fmt.Println("------- connect server error------")
return
}
fmt.Println("-------- connect server success ------")
//按理说启动client.Run()方法之后,服务器返回相应的处理结果,
//主go程会阻塞在Run方法,如果使用主go程中的Run方法接受返回消息,就会变成串行执行
//无法同一时刻满足其他的业务,而run应该跟dealResponse应该是并行的
//所以提供一个新的go程只处理server回应的信息
go client.DealResponse()
// 启动客户端业务,主go程阻塞在Run方法
fmt.Println("ready to process transactions......")
client.Run()
}
4、效果演示
4.1 源码编译
cd IM-System # 进入目录
在当前文件目录下生成可执行文件
go build -o server.exe .\\server.go .\\user.go .\\main.go # 编译服务端代码生成server.exe
go build -o client.exe .\\client.go # 编译客户端代码生成可执行文件client.exe
4.2 启动
先启动server.exe,再启动client.exe,进入可执行文件存放目录
.\\server.exe
.\\client.exe
或者直接打开
server:
client1:
client2:
4.3 超时强踢
当一个用户一直在线,无响应则会被超时重踢。这里主要使用一个定时器来记录时间,一个select来阻塞监听两个channel。
对应接口
func (this *Server) Handler(conn net.Conn)
4.4 公聊
输入1进入公聊模式,输入who查询在线用户
client发送公聊信息
client2接收到公聊消息
4.5 改名
输入3进入改名模式
4.6 私聊
输入2进入私聊模式,查看到在线用户的姓名
输入要私聊的用户和私聊的内容
对应用户收到私聊信息
5、其他
5.1 踩坑
原来用python用的多,刚接触go,有一些要踩的坑。
- 跨平台:go属于编译型语言,linux下,go build -o server,但是在window下需要编译成server.exe文件
- 包管理:刚开始建议直接把项目放在GOPATH下的src目录下,暂时避免go mode的配置
- netcat与telnet:在没写client.go客户端的时候,使用cmd终端模拟客户端测试,telnet可以短暂测试服务器是否存在,建立链接,但是无法大量数据传输,想要模拟通信还要使用netcat进行网络的读写数据,但是windows下没有nc命令需要重新安装。
- 乱码:默认的cmd下运行go程序可能会出现乱码,因为Go编码是UTF-8,而CMD默认的是GBK。可以在cmd窗口中使用
chcp 65001
改一下活动页,或者格式化直接使用英文更方便。
5.2 说明
本文仅作为项目学习笔记,项目为刘丹冰Aceld编写,b站Golang学习视频中的P37到P52节,讲的很好,很受用。
用Go快速搭建IM即时通讯系统
WebSocket的目标是在一个单独的持久连接上提供全双工、双向通信。在Javascript创建了Web Socket之后,会有一个HTTP请求发送到浏览器以发起连接。在取得服务器响应后,建立的连接会将HTTP升级从HTTP协议交换为WebSocket协议。
由于WebSocket使用自定义的协议,所以URL模式也略有不同。未加密的连接不再是http://,而是ws://;加密的连接也不是https://,而是wss://。在使用WebSocket URL时,必须带着这个模式,因为将来还有可能支持其他的模式。
使用自定义协议而非HTTP协议的好处是,能够在客户端和服务器之间发送非常少量的数据,而不必担心HTTP那样字节级的开销。由于传递的数据包很小,所以WebSocket非常适合移动应用。
接下来的篇幅会对Web Sockets的细节实现进行深入的探索,本文接下来的四个小节不会涉及到大量的代码片段,但是会对相关的API和技术原理进行分析,相信大家读完下文之后再来看这段描述,会有一种豁然开朗的感觉。
“握手通道”是HTTP协议中客户端和服务端通过"TCP三次握手"建立的通信通道。客户端和服务端使用HTTP协议进行的每次交互都需要先建立这样一条“通道”,然后通过这条通道进行通信。我们熟悉的ajax交互就是在这样一个通道上完成数据传输的,只不过ajax交互是短连接,在一次 request->response 之后,“通道”连接就断开了。
在Javascript创建了WebSocket之后,会有一个HTTP请求发送到浏览器以发起连接,然后服务端响应,这就是“握手“的过程。
在这个握手的过程当中,客户端和服务端主要做了两件事情:
1)建立了一条连接“握手通道”用于通信(这点和HTTP协议相同,不同的是HTTP协议完成数据交互后就释放了这条握手通道,这就是所谓的“短连接”,它的生命周期是一次数据交互的时间,通常是毫秒级别的);2)将HTTP协议升级到WebSocket协议,并复用HTTP协议的握手通道,从而建立一条持久连接。
说到这里可能有人会问:HTTP协议为什么不复用自己的“握手通道”,而非要在每次进行数据交互的时候都通过TCP三次握手重新建立“握手通道”呢?
答案是这样的:虽然“长连接”在客户端和服务端交互的过程中省去了每次都建立“握手通道”的麻烦步骤,但是维持这样一条“长连接”是需要消耗服务器资源的,而在大多数情况下,这种资源的消耗又是不必要的,可以说HTTP标准的制定经过了深思熟虑的考量。到我们后边说到WebSocket协议数据帧时,大家可能就会明白,维持一条“长连接”服务端和客户端需要做的事情太多了。
说完了握手通道,我们再来看HTTP协议如何升级到WebSocket协议的。即时通讯聊天软件app开发可以加小蓝豆的v:weikeyun24咨询即可
升级协议需要客户端和服务端交流,服务端怎么知道要将HTTP协议升级到WebSocket协议呢?它一定是接收到了客户端发送过来的某种信号。下面是我从谷歌浏览器中截取的“客户端发起协议升级请求的报文”,通过分析这段报文,我们能够得到有关WebSocket中协议升级的更多细节。
首先,客户端发起协议升级请求。采用的是标准的HTTP报文格式,且只支持GET方法。
下面是重点请求的首部的意义:
1)Connection:Upgrade:表示要升级的协议2)Upgrade: websocket:表示要升级到websocket协议3)Sec-WebSocket-Version: 13:表示websocket的版本4)Sec-WebSocket-Key:UdTUf90CC561cQXn4n5XRg== :与Response Header中的响应首部Sec-WebSocket-Accept: GZk41FJZSYY0CmsrZPGpUGRQzkY=是配套的,提供基本的防护,比如恶意的连接或者无意的连接。
其中Connection就是我们前边提到的,客户端发送给服务端的信号,服务端接受到信号之后,才会对HTTP协议进行升级。
那么服务端怎样确认客户端发送过来的请求是否是合法的呢?
在客户端每次发起协议升级请求的时候都会产生一个唯一码:Sec-WebSocket-Key。服务端拿到这个码后,通过一个算法进行校验,然后通过Sec-WebSocket-Accept响应给客户端,客户端再对Sec-WebSocket-Accept进行校验来完成验证。
这个算法很简单:
1)将Sec-WebSocket-Key跟全局唯一的(GUID)标识:258EAFA5-E914-47DA-95CA-C5AB0DC85B11拼接;2)通过SHA1计算出摘要,并转成base64字符串。
258EAFA5-E914-47DA-95CA-C5AB0DC85B11 这个字符串又叫“魔串",至于为什么要使用它作为Websocket握手计算中使用的字符串,这点我们无需关心,只需要知道它是RFC标准规定就可以了,官方的解析也只是简单的说此值不大可能被不明白WebSocket协议的网络终端使用。
服务端响应客户端的头部信息和HTTP协议的格式是相同的,HTTP1.1协议是以换行符(\\r\\n)分割的,我们可以通过正则匹配解析出Sec-WebSocket-Accept的值,这和我们使用curl工具模拟get请求是一个道理。这样展示结果似乎不太直观,我们使用命令行CLI来根据上图中的Sec-WebSocket-Key和握手算法来计算一下服务端返回的Sec-WebSocket-Accept是否正确。
以上是关于Golang搭建即时通信系统IM-System的主要内容,如果未能解决你的问题,请参考以下文章
毕设作品基于TCP协议的简单即时通信软件的设计与实现(源代码+论文)免费下载
论文参考ASP.NET基于TCP协议的简单即时通信软件的设计与实现(源代码+论文)免费下载