Golang搭建即时通信系统IM-System

Posted 蔚蓝的蓝

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Golang搭建即时通信系统IM-System相关的知识,希望对你有一定的参考价值。

Golang搭建即时通信系统

1、基本功能

主要是包括用户上线,用户私聊,用户公聊,超时强踢,查询在线用户,修改用户名等基本socket通信功能。

2、简要介绍

2.1系统结构如下


主要包括两个部分:

  • Client:负责客户端命令解析,请求与服务器的连接,发送消息等
  • Server:监听,连接创建,主要业务逻辑的处理等。

2.2目录结构

IM-System
——client.go   # 客户端相关逻辑代码  
    client.exe  # 客户端编译的可执行文件
    main.go     # 服务端主程序入口
    server.go   # 服务器相关逻辑代码
    user.go     # 用户相关业务逻辑
    server.exe  # 服务器代码编译的可执行文件

2.3其他说明

内部对象简要说明:
Server内部创建和维护server和user对象,其中每次有客户端尝试与服务器建立连接,都会创建一个新的user对象。
server中除了一些基本的属性包含两个主要的属性,OnlineMap和message_channel,OnlineMap是一个对象的字典,保存当前与服务器建立连接之后创建的对象。channel是一个通信管道,主要将客户端发送到服务端的message发送给其他的所有用户的管道来进行广播。
user除了一些基本的属性外包含两个主要属性,Conn和message_channel,Conn为客户端与服务器建立的TCP链接,channel表示通信管道,每次channel管道中有message都会回显给客户端达到客户端与服务器通信的效果。

协程的使用:
系统会创建多个go程。
user对象内部会创建go程来阻塞监听user.message_channel,一旦有消息则会回显客户端。
server对象内部也会创建go程阻塞监听server.message_channel,一旦管道有消息则会发送给每个在线用户的管道。
server在启动之后会监听连接,一旦有新的连接,为了防止阻塞主go程,就会生成新的go程处理该连接而不影响其他连接的处理。
系统是基于读写分离模型的,因此分别使用不同的go程去接收客户端写入的字节流和往客户端写入,保证了用户在写入消息的时候同时能够接收到其他消息。

3、代码

3.1 server.go

点击查看代码
package main

import (
	"fmt"
	"io"
	"net"
	"sync"
	"time"
)

type Server struct {
	Ip   string
	Port int

	//用户在线列表
	OnlineMap map[string]*User
	//给全局变量map加同步锁
	mapLock sync.RWMutex

	// 消息广播的channel
	Message chan string
}

//创建一个普通的server接口(函数)
//开头大写表示对外开放
func NewServer(ip string, port int) *Server {
	server := &Server{
		Ip: ip,
		Port: port,
		OnlineMap: make(map[string]*User),
		Message: make(chan string),
	}
	return server
}

//监听Message广播消息channel的goroutine,一旦有消息就发送给全部在线的user
func (this *Server) ListenMessage()  {
	//服务器端仍旧需要时刻监听
	for {
		msg := <-this.Message
		this.mapLock.Lock()
		//将消息发给所有用户
		for _, u := range this.OnlineMap {
			u.C <- msg
		}
		this.mapLock.Unlock()
	}
}

//广播消息
func (this *Server) BroadCast(user *User, msg string)  {
	sendMessage := "[" + user.Addr + "]" + user.Name + ":" + msg
	this.Message <- sendMessage
}

func (this *Server) Handler(conn net.Conn)  {
	//当前连接的业务

	user := NewUser(conn, this)
        fmt.Printf("new connection %s:%s established...\\n", user.Addr, user.Name)

	//用户在线消息封装
	user.Online()

	//监听用户是否活跃的channel
	isLive := make(chan bool)

	//接受客户端发送的消息
	//每个客户端都有一个go程处理客户端读的业务
	go func() {
		// 4K大小
		buf := make([]byte, 4096)
		for  {
			n, err := conn.Read(buf)
			//fmt.Println("n=", n, ",buf=", string(buf[0:n]))
			//n为成功读取到的字节数
			if n == 0{
				user.Offline()
				return
			}
			if err != nil && err != io.EOF{
				fmt.Println("Conn Read err:", err)
				return
			}

			//正常获取消息
			//用户输入消息是以\\n结尾的,需要去掉最后一个字节
			msg := string(buf[:n-1])

			//fmt.Println("handler msg: ", msg)
			//接收到的消息进行广播
			user.DoMessage(msg)

			//用户的任意消息,代表用户是一个活跃的用户,激活管道
			isLive <- true
		}
	}()

	for {
		//当前handler阻塞监听管道的消息,一旦两个管道有一个有值,就会执行select
		select {
			//这里一旦当isLive为True,那么就会进入select,执行完case<-isLive之后
			//会接着更新After管道,但是因为还没到时间不会进入case<-After之后的语句
			case <- isLive:
				//	当前用户被激活,
				//	这里为了重置定时器,把case<-isLive 放到了上边

			//设置定时器,如果定时触发,则强踢,如果发消息,则重新激活定时器
			//After本身是一个channel,如果发生超时,那么该channel中就能读取到数据
			case <- time.After(time.Second * 60):
				//进入case表示超时,重置定时器
				//将当前的User强制关闭

				//发出下线消息
				user.SendMessage("you are forced offline")

				//销毁管道资源
				close(user.C)

				//关闭用户连接
				conn.Close()

				//可能是因为管道资源以及conn连接关闭之后
				//OnlineMap中值被回收之后,自动删除键值对???

				//退出handler
				return  // 或者runtime.Goexit()
		}
	}

}

//启动服务器的接口
func (this *Server) Start()  {
	//socket listen
        fmt.Println("server is starting...")
	listener, err := net.Listen("tcp", fmt.Sprintf("%s:%d", this.Ip, this.Port))
	if err != nil{
		fmt.Println("listener err:", err)
		return
	}
	//为了防止遗忘关闭连接,加上defer保证在接口结束之后close
	//close listen socket
	defer listener.Close()

	//启动监听Message的goroutine
	go this.ListenMessage()

	//listener一直监听连接
	for {
		//accept
		conn, err := listener.Accept()
		if err != nil{
			fmt.Println("listener accept err:", err)
			continue
		}
		//go程处理连接
		go this.Handler(conn)
	}
}

3.2 user.go

点击查看代码
package main

import (
	"net"
	"strings"
)

type User struct {
	Name  string
	Addr  string
	C     chan string  // channel数据为string
	conn  net.Conn

	server *Server  // 当前用户属于的服务器
}

//创建用户的接口
func NewUser(conn net.Conn, server *Server) *User {
	userAddr := conn.RemoteAddr().String()
	user := &User{
		Name: userAddr,
		Addr: userAddr,
		C:    make(chan string),
		conn: conn,
		server: server,
	}

	//每一个新用户都绑定一个go程监听当前用户的channel消息
	go user.ListenMessage()

	return user
}

//处理用户上线
func (this *User) Online()  {

	//用户上线,将用户加入onlinemap中
	this.server.mapLock.Lock()
	this.server.OnlineMap[this.Name] = this
	this.server.mapLock.Unlock()

	//广播用户上线的消息
	this.server.BroadCast(this, "user is online")
}

//处理用户下线
func (this *User) Offline()  {

	//用户下线,将用户从online map删除
	this.server.mapLock.Lock()
	delete(this.server.OnlineMap, this.Name)
	this.server.mapLock.Unlock()

	//广播用户下线
	this.server.BroadCast(this, "user offline")

}

//给当前user对应的客户端发送消息
func (this *User) SendMessage(msg string)  {
	this.conn.Write([]byte(msg))
}

//处理消息
func (this *User) DoMessage(msg string)  {
	if msg == "who"{
		//查询当前都有哪些用户在线
		this.server.mapLock.Lock()
		for _, u := range this.server.OnlineMap{
			onlineMsg := "[" + u.Addr + "]" + u.Name + ":" + "online...\\n"
			//只发送给当前发送查询指令的用户
			this.SendMessage(onlineMsg)
		}
		this.server.mapLock.Unlock()

	} else if len(msg) > 7 && msg[:7] == "rename|" {
		//新的用户名
		newName := strings.Split(msg, "|")[1]
		//判断新的用户名是否存在
		_, ok := this.server.OnlineMap[newName]
		if ok{
			//服务器中已经存在该用户名
			this.SendMessage(newName + "has existed...")
		} else {
			//修改OnlineMap
			this.server.mapLock.Lock()
			delete(this.server.OnlineMap, this.Name)
			this.server.OnlineMap[newName] = this
			this.server.mapLock.Unlock()

			this.Name = newName
			this.SendMessage("you have updated user name: " + this.Name + "\\n")
		}
	} else if len(msg) > 4 && msg[:3] == "to|" {
		// 消息格式 ”to|alex|hello“

		//1 获取用户名
		remoteName := strings.Split(msg, "|")[1]
		if remoteName == ""{
			//用户名无效
			this.SendMessage("invalid, user \\"to|alex|hello\\"")
			return
		}

		//2 查询对象
		remoteUser, ok := this.server.OnlineMap[remoteName]
		if !ok {
			//用户不存在
			this.SendMessage("username not exist")
			return
		}

		//3 获取通信消息
		content := strings.Split(msg, "|")[2]
		if content == ""{
			this.SendMessage("invalid message")
			return
		}

		//4 发送消息
		remoteUser.SendMessage(this.Name + " say: " + content)

	} else {
		this.server.BroadCast(this, msg)
	}
}

//监听当前User channel的方法,一旦有消息,就直接发送给对端客户端
func (this *User) ListenMessage()  {
	for  {
		msg := <-this.C
		//连接写回msg,转换为二进制
		//fmt.Println("user listen msg:", msg)
		this.conn.Write([]byte(msg + "\\n"))
	}
}

3.3 main.go

点击查看代码
package main


func main()  {

	server := NewServer("127.0.0.1", 8888)
	server.Start()
}

3.4 client.go

点击查看代码
package main

import (
	"flag"
	"fmt"
	"io"
	"net"
	"os"
)

type Client struct {
	ServerIp string
	ServerPort  int
	Name     string
	conn   net.Conn
	flag        int //判断当前client的模式
}

func newClient(serverIp string, serverPort int) *Client {
	client := &Client{
		ServerIp: serverIp,
		ServerPort: serverPort,
		flag: 999,  // 设置flay默认值,否则flag默认为int整型
	}
	//创建链接
	conn, err := net.Dial("tcp", fmt.Sprintf("%s:%d", serverIp, serverPort))
	if err != nil {
		fmt.Println("net.Dial error:", err)
		return nil
	}
	client.conn = conn
	//返回客户端
	return client
}

//client菜单栏的输出,并获取flag输入
func (client *Client) menu() bool {
	var flag int

	fmt.Println("input 1 into public chat")
	fmt.Println("input 2 into private chat")
	fmt.Println("input 3 into rename")
	fmt.Println("input 0 into exit")

	fmt.Scanln(&flag)

	if flag >= 0 && flag <=3{
		client.flag = flag
		return true
	} else {
		fmt.Println("invalid input integer")
		return false
	}
}

// 监听server回应的消息,直接显示到标准输出
func (client *Client) DealResponse()  {
	io.Copy(os.Stdout, client.conn) // 永久阻塞监听
	/*
	上面一句相当于如下for循环一直从conn中读取,然后输出到终端
	//for {
	//	buf := make([]byte, 4096)
	//	client.conn.Read(buf)
	//	fmt.Println(string(buf))
	//}
	 */

}

//查询在线用户
func (client *Client) QueryUsers() {
	sendMsg := "who\\n"  // 直接查询
	_, err := client.conn.Write([]byte(sendMsg))
	if err != nil{
		fmt.Println("conn write err: ", err)
		return
	}
}

//私聊模式
func (client *Client) PrivateChat()  {
	var remoteName string
	var chatMsg string

	client.QueryUsers()
	fmt.Println("please chat username, exit for stop")
	fmt.Scanln(&remoteName)
	for remoteName != "exit"{
		fmt.Println("please input private chat content, exit for stop")
		fmt.Scanln(&chatMsg)
		for chatMsg != "exit"{
			//发送给服务器
			if len(chatMsg) != 0{
				//消息不为空
				sendMsg := "to|" + remoteName + "|" + chatMsg + "\\n\\n"
				_, err := client.conn.Write([]byte(sendMsg))
				if err != nil{
					fmt.Println("conn write err:", err)
					break
				}
			}
			//写入下一条消息
			chatMsg = ""
			fmt.Println("please input private chat content, exit for stop")
			fmt.Scanln(&chatMsg)
		}
		//给一个用户发送消息之后,可能还会给其他用户发送
		client.QueryUsers()
		fmt.Println("please chat username, exit for stop")
		fmt.Scanln(&remoteName)
	}

}

func (client *Client) PublicChat() {
	//公聊模式
	var chatMsg string

	fmt.Println("please input public chat content, exit for stop")
	fmt.Scanln(&chatMsg)

	for chatMsg != "exit"{
		//发送给服务器
		if len(chatMsg) != 0{
			sendMsg := chatMsg + "\\n"
			_, err := client.conn.Write([]byte(sendMsg))
			if err != nil{
				fmt.Println("conn write err:", err)
				break
			}
		}

		//重新接受下一条消息
		chatMsg = ""
		fmt.Println("please input public chat content, exit for stop")
		fmt.Scanln(&chatMsg)
	}
}

func (client *Client) UpdateName() bool {
	fmt.Println("please input username")
	//接收输入的用户名
	fmt.Scanln(&client.Name)

	sendMsg := "rename|" + client.Name + "\\n"
	//按照格式写入连接
	_, err := client.conn.Write([]byte(sendMsg))
	if err != nil{
		fmt.Println("conn write error:", err)
		return false
	}
	return true
}

func (client *Client) Run() {
	for client.flag != 0{
		for client.menu() != true {
		}

		//根据不同的模式处理不同的业务
		switch  client.flag {
			case 1:
				//公聊模式
				fmt.Println("under public chat mode...")
				client.PublicChat()
				break
			case 2:
				//私聊模式
				fmt.Println("under private chat mode...")
				client.PrivateChat()
				break
			case 3:
				//	改名
				fmt.Println("under rename mode...")
				client.UpdateName()
				break
			case 0:
				//退出
				fmt.Println("ready to exit")
				break
		}
	}
}

//尝试从终端命令行解析IP和Port创建客户端
var serverIp string
var serverPort int

//文件的初始化函数
//命令的格式  ./client.exe -ip 127.0.0.1 -port 8888
func init() {
	//属于初始化工作,一般放在init中
	flag.StringVar(&serverIp, "ip", "127.0.0.1", "set server ip(default:127.0.0.1)")
	flag.IntVar(&serverPort, "port", 8888, "set server port(default:8888)")
}

func main()  {
	//通过命令行解析
	flag.Parse()
	client := newClient(serverIp, serverPort)
	//client := newClient("127.0.0.1", 8888)
	if client == nil{
		fmt.Println("------- connect server error------")
		return
	}

	fmt.Println("-------- connect server success ------")

	//按理说启动client.Run()方法之后,服务器返回相应的处理结果,
	//主go程会阻塞在Run方法,如果使用主go程中的Run方法接受返回消息,就会变成串行执行
	//无法同一时刻满足其他的业务,而run应该跟dealResponse应该是并行的
	//所以提供一个新的go程只处理server回应的信息
	go client.DealResponse()

	// 启动客户端业务,主go程阻塞在Run方法
	fmt.Println("ready to process transactions......")
	client.Run()

}

4、效果演示

4.1 源码编译

cd IM-System  # 进入目录

在当前文件目录下生成可执行文件

go build -o server.exe .\\server.go .\\user.go .\\main.go  # 编译服务端代码生成server.exe
go build -o client.exe .\\client.go  # 编译客户端代码生成可执行文件client.exe

4.2 启动

先启动server.exe,再启动client.exe,进入可执行文件存放目录

.\\server.exe 
.\\client.exe

或者直接打开
server:

client1:

client2:

4.3 超时强踢

当一个用户一直在线,无响应则会被超时重踢。这里主要使用一个定时器来记录时间,一个select来阻塞监听两个channel。
对应接口

func (this *Server) Handler(conn net.Conn)

4.4 公聊

输入1进入公聊模式,输入who查询在线用户

client发送公聊信息

client2接收到公聊消息

4.5 改名

输入3进入改名模式

4.6 私聊

输入2进入私聊模式,查看到在线用户的姓名

输入要私聊的用户和私聊的内容

对应用户收到私聊信息

5、其他

5.1 踩坑

原来用python用的多,刚接触go,有一些要踩的坑。

  • 跨平台:go属于编译型语言,linux下,go build -o server,但是在window下需要编译成server.exe文件
  • 包管理:刚开始建议直接把项目放在GOPATH下的src目录下,暂时避免go mode的配置
  • netcat与telnet:在没写client.go客户端的时候,使用cmd终端模拟客户端测试,telnet可以短暂测试服务器是否存在,建立链接,但是无法大量数据传输,想要模拟通信还要使用netcat进行网络的读写数据,但是windows下没有nc命令需要重新安装。
  • 乱码:默认的cmd下运行go程序可能会出现乱码,因为Go编码是UTF-8,而CMD默认的是GBK。可以在cmd窗口中使用chcp 65001改一下活动页,或者格式化直接使用英文更方便。

5.2 说明

本文仅作为项目学习笔记,项目为刘丹冰Aceld编写,b站Golang学习视频中的P37到P52节,讲的很好,很受用。

用Go快速搭建IM即时通讯系统

WebSocket的目标是在一个单独的持久连接上提供全双工、双向通信。在Javascript创建了Web Socket之后,会有一个HTTP请求发送到浏览器以发起连接。在取得服务器响应后,建立的连接会将HTTP升级从HTTP协议交换为WebSocket协议。

由于WebSocket使用自定义的协议,所以URL模式也略有不同。未加密的连接不再是http://,而是ws://;加密的连接也不是https://,而是wss://。在使用WebSocket URL时,必须带着这个模式,因为将来还有可能支持其他的模式。

使用自定义协议而非HTTP协议的好处是,能够在客户端和服务器之间发送非常少量的数据,而不必担心HTTP那样字节级的开销。由于传递的数据包很小,所以WebSocket非常适合移动应用。

接下来的篇幅会对Web Sockets的细节实现进行深入的探索,本文接下来的四个小节不会涉及到大量的代码片段,但是会对相关的API和技术原理进行分析,相信大家读完下文之后再来看这段描述,会有一种豁然开朗的感觉。

“握手通道”是HTTP协议中客户端和服务端通过"TCP三次握手"建立的通信通道。客户端和服务端使用HTTP协议进行的每次交互都需要先建立这样一条“通道”,然后通过这条通道进行通信。我们熟悉的ajax交互就是在这样一个通道上完成数据传输的,只不过ajax交互是短连接,在一次 request->response 之后,“通道”连接就断开了。

在Javascript创建了WebSocket之后,会有一个HTTP请求发送到浏览器以发起连接,然后服务端响应,这就是“握手“的过程。

在这个握手的过程当中,客户端和服务端主要做了两件事情:

1)建立了一条连接“握手通道”用于通信(这点和HTTP协议相同,不同的是HTTP协议完成数据交互后就释放了这条握手通道,这就是所谓的“短连接”,它的生命周期是一次数据交互的时间,通常是毫秒级别的);2)将HTTP协议升级到WebSocket协议,并复用HTTP协议的握手通道,从而建立一条持久连接。

说到这里可能有人会问:HTTP协议为什么不复用自己的“握手通道”,而非要在每次进行数据交互的时候都通过TCP三次握手重新建立“握手通道”呢?

答案是这样的:虽然“长连接”在客户端和服务端交互的过程中省去了每次都建立“握手通道”的麻烦步骤,但是维持这样一条“长连接”是需要消耗服务器资源的,而在大多数情况下,这种资源的消耗又是不必要的,可以说HTTP标准的制定经过了深思熟虑的考量。到我们后边说到WebSocket协议数据帧时,大家可能就会明白,维持一条“长连接”服务端和客户端需要做的事情太多了。

说完了握手通道,我们再来看HTTP协议如何升级到WebSocket协议的。即时通讯聊天软件app开发可以加小蓝豆的v:weikeyun24咨询即可

升级协议需要客户端和服务端交流,服务端怎么知道要将HTTP协议升级到WebSocket协议呢?它一定是接收到了客户端发送过来的某种信号。下面是我从谷歌浏览器中截取的“客户端发起协议升级请求的报文”,通过分析这段报文,我们能够得到有关WebSocket中协议升级的更多细节。

首先,客户端发起协议升级请求。采用的是标准的HTTP报文格式,且只支持GET方法。

下面是重点请求的首部的意义:

1)Connection:Upgrade:表示要升级的协议2)Upgrade: websocket:表示要升级到websocket协议3)Sec-WebSocket-Version: 13:表示websocket的版本4)Sec-WebSocket-Key:UdTUf90CC561cQXn4n5XRg== :与Response Header中的响应首部Sec-WebSocket-Accept: GZk41FJZSYY0CmsrZPGpUGRQzkY=是配套的,提供基本的防护,比如恶意的连接或者无意的连接。

其中Connection就是我们前边提到的,客户端发送给服务端的信号,服务端接受到信号之后,才会对HTTP协议进行升级。

那么服务端怎样确认客户端发送过来的请求是否是合法的呢?

在客户端每次发起协议升级请求的时候都会产生一个唯一码:Sec-WebSocket-Key。服务端拿到这个码后,通过一个算法进行校验,然后通过Sec-WebSocket-Accept响应给客户端,客户端再对Sec-WebSocket-Accept进行校验来完成验证。

这个算法很简单:

1)将Sec-WebSocket-Key跟全局唯一的(GUID)标识:258EAFA5-E914-47DA-95CA-C5AB0DC85B11拼接;2)通过SHA1计算出摘要,并转成base64字符串。

258EAFA5-E914-47DA-95CA-C5AB0DC85B11 这个字符串又叫“魔串",至于为什么要使用它作为Websocket握手计算中使用的字符串,这点我们无需关心,只需要知道它是RFC标准规定就可以了,官方的解析也只是简单的说此值不大可能被不明白WebSocket协议的网络终端使用。

服务端响应客户端的头部信息和HTTP协议的格式是相同的,HTTP1.1协议是以换行符(\\r\\n)分割的,我们可以通过正则匹配解析出Sec-WebSocket-Accept的值,这和我们使用curl工具模拟get请求是一个道理。这样展示结果似乎不太直观,我们使用命令行CLI来根据上图中的Sec-WebSocket-Key和握手算法来计算一下服务端返回的Sec-WebSocket-Accept是否正确。

以上是关于Golang搭建即时通信系统IM-System的主要内容,如果未能解决你的问题,请参考以下文章

Socket搭建即时通讯服务器

毕设作品基于TCP协议的简单即时通信软件的设计与实现(源代码+论文)免费下载

php即时通讯是怎么搭建的?有没有知道的?

论文参考ASP.NET基于TCP协议的简单即时通信软件的设计与实现(源代码+论文)免费下载

新知实验室 - TRTC 实践音视频互动 Demo即时通信 IM 服务搭建

新知实验室 - TRTC 实践音视频互动 Demo即时通信 IM 服务搭建