利用RabbitMQMySQL实现超大用户级别的消息在/离线收发
Posted 穷酸秀才大艹包
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了利用RabbitMQMySQL实现超大用户级别的消息在/离线收发相关的知识,希望对你有一定的参考价值。
由于RabbitMQ中只有队列(queue)才能存储信息,所以用RabbitMQ实现超大用户级别(百万计)的消息在/离线收发需要对每一个用户创建一个永久队列。
但是RabbitMQ节点内存有限,经测试后发现节点集群也无法满足数百万用户队列收发数据的要求,所以最终决定采用数据库辅助实现该功能。
一、数据库结构
user_list数据库下有4张表:user_info、group_info、groupmember_info、message_info。
user_info表中含有username(主键,非空,VARCHAR(50))、password(非空,VARCHAR(50))、routingkey(非空,VARCHAR(50))、has_queue(INT UNSIGNED)四个字段。
group_info表中含有groupname(主键,非空,VARCHAR(50))、password(非空,VARCHAR(50))、creator(非空,VARCHAR(50))三个字段。
groupmember_info表中含有username(主键,非空,VARCHAR(50))、groupname(主键,非空,VARCHAR(50))两个字段。
message_info表中含有sendtime(非空,VARCHAR(50))、body(非空,VARCHAR(300)),receiver(非空,VARCHAR(50))、sender(非空,VARCHAR(100))四个字段。
二、客户端结构
1、文件夹创建以及包依赖安装:
dotnet new console --name Client mv Client/Program.cs Client/Client.cs
cd Client dotnet add package RabbitMQ.Client
dotnet add package mysql.Data dotnet restore
2、项目结构
Client.cs(主程序):
using RabbitMQ.Client; using MySql.Data.MySqlClient; namespace Client { class Client { static void Main(string[] args) { // 连接数据库 string connStr = "Database=user_list;datasource=127.0.0.1;port=3306;user=root;pwd=123456;"; MySqlConnection sqlConn = new MySqlConnection(connStr); sqlConn.Open(); // 连接RabbitMQ var factory = new ConnectionFactory() { HostName = "dev.corp.wingoht.com", VirtualHost = "cd", UserName = "ishowfun", Password = "123456" }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { // 用户登录,并获取离线消息 UserInfo user = UserLogin.Login(sqlConn); // 监听在线消息 Consumer.StartListening(channel, sqlConn, user); // 监听按钮事件,调用不同的客户端功能 KeyListening.StartListening(channel, sqlConn, user); // 退出登录 UserLogout.Logout(channel, sqlConn, user); } // 释放数据库连接 sqlConn.Close(); } } }
UserLogin.cs:
using System; using MySql.Data.MySqlClient; namespace Client { class UserLogin { public static UserInfo Login(MySqlConnection conn) { UserInfo user = new UserInfo(); // 用户名输入 Console.Write("Please enter your username: "); user.Username = Console.ReadLine(); while (user.Username.Contains(",") || user.Username.Contains(" ")) { Console.WriteLine("Error: Username can not contain ","! and " ""); Console.Write("Please enter your username again: "); user.Username = Console.ReadLine(); } MySqlCommand cmd = new MySqlCommand("select * from user_info where username=‘" + user.Username + "‘", conn); MySqlDataReader reader = cmd.ExecuteReader(); // 判断该用户是否已经注册 if (reader.Read()) { // 已注册用户登录 user.IsNewUser = true; // 验证密码 Console.Write("Please enter your password: "); user.Password = reader.GetString("password"); while (user.Password != Console.ReadLine()) { Console.WriteLine("Error: Username and password do not match!"); Console.Write("Please enter your password again: "); } Console.WriteLine("Welcome back, {0}!", user.Username); Console.WriteLine("-------------------------------------"); // 获取当前用户的路由键 user.RoutingKey = reader.GetString("routingkey"); reader.Close(); // 读取离线消息 cmd = new MySqlCommand("select * from message_info where receiver=‘" + user.Username + "‘", conn); reader = cmd.ExecuteReader(); Console.WriteLine("Unread Message: "); while (reader.Read()) { string sender = reader.GetString("sender"); string sendTime = reader.GetString("sendtime"); string content = reader.GetString("body"); string[] splitSender = sender.Split(" "); if (splitSender.Length == 1) { Console.WriteLine("Sender: {0}, SendTime: {1}, Content: {2}", sender, sendTime, content); } else { sender = splitSender[0]; string group = splitSender[1]; Console.WriteLine("Sender: {0} from {1}, SendTime: {2}, Content: {3}", sender, group, sendTime, content); } } reader.Close(); // 删除已处理的离线消息 cmd = new MySqlCommand("delete from message_info where [email protected]", conn); cmd.Parameters.AddWithValue("re", user.Username); cmd.ExecuteNonQuery(); } else { // 新用户注册并登录 user.IsNewUser = false; reader.Close(); Console.WriteLine("Welcome, new user!"); // 设置密码 Console.Write("Please set your password: "); user.Password = Console.ReadLine(); Console.Write("Please confirm your password: "); while (user.Password != Console.ReadLine()) { Console.WriteLine("Error: Confirmation failure!"); Console.Write("Please set your password again: "); user.Password = Console.ReadLine(); Console.Write("Please confirm your password: "); } // 生成该用户的加密路由键,并保证该路由键唯一 user.RoutingKey = GenerateKey.GenerateRandomString(32); cmd = new MySqlCommand("select * from user_info where routingkey=‘" + user.RoutingKey + "‘", conn); reader = cmd.ExecuteReader(); while (reader.Read()) { reader.Close(); user.RoutingKey = GenerateKey.GenerateRandomString(32); reader = cmd.ExecuteReader(); } reader.Close(); } Console.WriteLine("-------------------------------------"); return user; } } }
UserInfo.cs:
namespace Client { class UserInfo { // 唯一用户名 public string Username { get; set; } // 登录密码 public string Password { get; set; } // 加密路由键 public string RoutingKey { get; set; } // 是否为新用户 public bool IsNewUser { get; set; } } }
GenerateKey.cs:
using System; namespace Client { class GenerateKey { // 字符串中字符的取值范围 private static char[] constant = { ‘0‘,‘1‘,‘2‘,‘3‘,‘4‘,‘5‘,‘6‘,‘7‘,‘8‘,‘9‘, ‘a‘,‘b‘,‘c‘,‘d‘,‘e‘,‘f‘,‘g‘,‘h‘,‘i‘,‘j‘,‘k‘,‘l‘,‘m‘,‘n‘,‘o‘,‘p‘,‘q‘,‘r‘,‘s‘,‘t‘,‘u‘,‘v‘,‘w‘,‘x‘,‘y‘,‘z‘, ‘A‘,‘B‘,‘C‘,‘D‘,‘E‘,‘F‘,‘G‘,‘H‘,‘I‘,‘J‘,‘K‘,‘L‘,‘M‘,‘N‘,‘O‘,‘P‘,‘Q‘,‘R‘,‘S‘,‘T‘,‘U‘,‘V‘,‘W‘,‘X‘,‘Y‘,‘Z‘ }; // 生成指定长度的随机字符串 public static string GenerateRandomString(int len) { System.Text.StringBuilder newRandom = new System.Text.StringBuilder(62); Random rd = new Random(); for (int i = 0; i < len; i++) { newRandom.Append(constant[rd.Next(62)]); } return newRandom.ToString(); } } }
Consumer.cs:
using System; using System.Text; using RabbitMQ.Client; using RabbitMQ.Client.Events; using MySql.Data.MySqlClient; namespace Client { class Consumer { public static void StartListening(IModel channel, MySqlConnection conn, UserInfo user) { // 交换机声明 channel.ExchangeDeclare(exchange: "topic_message", type: "topic"); // 队列创建与绑定 channel.QueueDeclare(queue: user.RoutingKey, durable: true, exclusive: false, autoDelete: false, arguments: null); channel.QueueBind(queue: user.RoutingKey, exchange: "topic_message", routingKey: user.RoutingKey); var consumer = new EventingBasicConsumer(channel); // 处理收到的消息 consumer.Received += (model, ea) => { var body = ea.Body; string message = Encoding.UTF8.GetString(body); string[] splitRes = message.Trim().Split(","); string sendTime = splitRes[1]; string content = splitRes[2]; string[] splitSender = splitRes[0].Split(" "); if (splitRes.Length > 3) { for (int i = 3; i < splitRes.Length; i++) content = content + "," + splitRes[i]; } Console.WriteLine(); if (splitSender.Length == 1) { string sender = splitRes[0]; Console.WriteLine("Sender: {0}, SendTime: {1}, Content: {2}", sender, sendTime, content); } else { string sender = splitSender[0]; string group = splitSender[1]; Console.WriteLine("Sender: {0} from {1}, SendTime: {2}, Content: {3}", sender, group, sendTime, content); } }; channel.BasicConsume(queue: user.RoutingKey, autoAck: true, consumer: consumer); MySqlCommand cmd; // 更新数据库 if (user.IsNewUser) { // 向数据库声明该用户已经拥有了队列,可以直接在线发送 cmd = new MySqlCommand("update user_info set has_queue=1 where [email protected]", conn); cmd.Parameters.AddWithValue("uid", user.Username); cmd.ExecuteNonQuery(); } else { // 防止注入地插入新用户数据 cmd = new MySqlCommand("insert into user_info set [email protected],[email protected],[email protected],has_queue=1", conn); cmd.Parameters.AddWithValue("uid", user.Username); cmd.Parameters.AddWithValue("pwd", user.Password); cmd.Parameters.AddWithValue("rk", user.RoutingKey); cmd.ExecuteNonQuery(); } Console.WriteLine("{0} Start Listening!", user.Username); Console.WriteLine("-------------------------------------"); } } }
KeyListening.cs:
using System; using RabbitMQ.Client; using MySql.Data.MySqlClient; namespace Client { class KeyListening { public static void StartListening(IModel channel, MySqlConnection conn, UserInfo user) { MQHelper.HelpTip(); Console.WriteLine("-------------------------------------"); ConsoleKeyInfo keyPressed; bool escFlag = false; while (true) { keyPressed = Console.ReadKey(); Console.WriteLine(); Console.WriteLine("-------------------------------------"); switch (keyPressed.Key) { // 0: 显示帮助信息 case ConsoleKey.D0: MQHelper.HelpTip(); break; // 1: 创建群组 case ConsoleKey.D1: MQHelper.CreateGroup(conn, user); break; // 2: 申请加入群组 case ConsoleKey.D2: MQHelper.JoinGroup(conn, user); break; // 3: 退出群组 case ConsoleKey.D3: MQHelper.LeaveGroup(conn, user); break; // 4: 显示群组信息 case ConsoleKey.D4: MQHelper.ShowGroup(conn, user); break; // 5: 单播 case ConsoleKey.D5: MQHelper.BasicSend(channel, conn, user); break; // 6: 组播 case ConsoleKey.D6: MQHelper.GroupSend(channel, conn, user); break; // ESC: 退出 case ConsoleKey.Escape: escFlag = true; break; // 无意义按键 default: Console.WriteLine("Error: Invalid press!"); break; } Console.WriteLine("-------------------------------------"); if (escFlag) break; } } } }
MQHelper.cs:
using System; using System.Text; using System.Collections.Generic; using RabbitMQ.Client; using MySql.Data.MySqlClient; namespace Client { // 该类包含了按钮监听事件触发的各项功能的实现 class MQHelper { #region 显示帮助信息 public static void HelpTip() { // 显示各按键对应功能 Console.WriteLine("Function Press:"); Console.WriteLine("[0]: Help"); Console.WriteLine("[1]: Create group"); Console.WriteLine("[2]: Join group"); Console.WriteLine("[3]: Leave group"); Console.WriteLine("[4]: Show groups"); Console.WriteLine("[5]: BasicSend"); Console.WriteLine("[6]: GroupSend"); Console.WriteLine("[ESC]: Log out"); } #endregion #region 创建群组 public static void CreateGroup(MySqlConnection conn, UserInfo user) { // 设置群组名称以及加入密码 Console.Write("Please set the group name: "); string groupName = Console.ReadLine(); MySqlCommand groupCmd = new MySqlCommand("select * from group_info where groupname=‘" + groupName + "‘", conn); MySqlDataReader groupReader = groupCmd.ExecuteReader(); if (groupReader.Read()) { groupReader.Close(); Console.WriteLine("Error: This group name already exists!"); } else { groupReader.Close(); Console.Write("Please set the password: "); string pwd = Console.ReadLine(); Console.Write("Please confirm the password: "); while (pwd != Console.ReadLine()) { Console.WriteLine("Error: Confirmation failure!"); Console.Write("Please set the password again: "); pwd = Console.ReadLine(); Console.Write("Please confirm the password: "); } // 将群组信息插入数据库 groupCmd = new MySqlCommand("insert into group_info set [email protected],[email protected],[email protected]", conn); groupCmd.Parameters.AddWithValue("gid", groupName); groupCmd.Parameters.AddWithValue("pwd", pwd); groupCmd.Parameters.AddWithValue("cr", user.Username); groupCmd.ExecuteNonQuery(); groupCmd = new MySqlCommand("insert into groupmember_info set [email protected],[email protected]", conn); groupCmd.Parameters.AddWithValue("gid", groupName); groupCmd.Parameters.AddWithValue("uid", user.Username); groupCmd.ExecuteNonQuery(); Console.WriteLine("Successfully create the group!"); } } #endregion #region 申请加入群组 public static void JoinGroup(MySqlConnection conn, UserInfo user) { // 输入要加入的群名 Console.Write("Please enter the group name: "); string groupName = Console.ReadLine(); MySqlCommand groupCmd = new MySqlCommand("select * from group_info where groupname=‘" + groupName + "‘", conn); MySqlDataReader groupReader = groupCmd.ExecuteReader(); // 判断该群是否存在 if (groupReader.Read()) { // 验证加入密码 string pwd = groupReader.GetString("password"); groupReader.Close(); Console.Write("Please enter your password: "); if (pwd != Console.ReadLine()) { Console.WriteLine("Error: Username and password do not match!"); } else { groupCmd = new MySqlCommand("select * from groupmember_info where groupname=‘" + groupName + "‘ and username=‘" + user.Username + "‘", conn); groupReader = groupCmd.ExecuteReader(); // 判断该用户是否已经在这个群中 if (groupReader.Read()) { groupReader.Close(); Console.WriteLine("Error: You already join the group!"); } else { // 加入该群,并更新数据库信息 groupReader.Close(); groupCmd = new MySqlCommand("insert into groupmember_info set [email protected],[email protected]", conn); groupCmd.Parameters.AddWithValue("gid", groupName); groupCmd.Parameters.AddWithValue("uid", user.Username); groupCmd.ExecuteNonQuery(); Console.WriteLine("Successfully join the group!"); } } } else { groupReader.Close(); Console.WriteLine("Error: This group name does not exist!"); } } #endregion #region 退出群组 public static void LeaveGroup(MySqlConnection conn, UserInfo user) { // 输入要退出群的名称 Console.Write("Please enter the group name: "); string groupName = Console.ReadLine(); MySqlCommand groupCmd = new MySqlCommand("select * from groupmember_info where groupname=‘" + groupName + "‘ and username=‘" + user.Username + "‘", conn); MySqlDataReader groupReader = groupCmd.ExecuteReader(); // 判断你是否在这个群中 if (groupReader.Read()) { // 退出该群,并更新数据库信息 groupReader.Close(); groupCmd = new MySqlCommand("delete from groupmember_info where [email protected] and [email protected]", conn); groupCmd.Parameters.AddWithValue("gid", groupName); groupCmd.Parameters.AddWithValue("uid", user.Username); groupCmd.ExecuteNonQuery(); Console.WriteLine("Successfully leave the group!"); } else { groupReader.Close(); Console.WriteLine("Error: You didn‘t join the group!"); } } #endregion #region 显示群组信息 public static void ShowGroup(MySqlConnection conn, UserInfo user) { MySqlCommand groupCmd = new MySqlCommand("select * from groupmember_info where username=‘" + user.Username + "‘", conn); MySqlDataReader groupReader = groupCmd.ExecuteReader(); // 显示加入的所有群名称 while (groupReader.Read()) { Console.WriteLine(groupReader.GetString("groupname")); } groupReader.Close(); } #endregion #region 单播 public static void BasicSend(IModel channel, MySqlConnection conn, UserInfo user) { // 输入收信人用户名 Console.Write("Please enter the receiver: "); string receiver = Console.ReadLine(); Console.WriteLine("-------------------------------------"); // 输入待发送的消息 Console.WriteLine("Please enter the message: "); string content = Console.ReadLine(); Console.WriteLine("-------------------------------------"); MySqlCommand sendCmd = new MySqlCommand("select * from user_info where username=‘" + receiver + "‘", conn); MySqlDataReader sendReader = sendCmd.ExecuteReader(); bool user_flag = sendReader.Read(); // 判断是否存在该收信人 if (user_flag) { string receiverKey = sendReader.GetString("routingkey"); int hasQueue = sendReader.GetInt32("has_queue"); string sendTime = DateTime.Now.ToString(); string message = user.Username + "," + sendTime + "," + content; var body = Encoding.UTF8.GetBytes(message); sendReader.Close(); // 目标队列存在,则直接发布信息;否则将信息数据存入数据库 if (hasQueue == 1) { // 在线发布消息 var properties = channel.CreateBasicProperties(); properties.Persistent = true; channel.BasicPublish(exchange: "topic_message", routingKey: receiverKey, basicProperties: properties, body: body); } else { // 离线发布消息 sendCmd = new MySqlCommand("insert into message_info set [email protected],[email protected],[email protected],[email protected]", conn); sendCmd.Parameters.AddWithValue("st", sendTime); sendCmd.Parameters.AddWithValue("body", content); sendCmd.Parameters.AddWithValue("re", receiver); sendCmd.Parameters.AddWithValue("se", user.Username); sendCmd.ExecuteNonQuery(); } Console.WriteLine("You sent [ {0} ] to {1} at {2}.", content, receiver, sendTime); } else { sendReader.Close(); Console.WriteLine("Error: The receiver does not exist!"); } } #endregion #region 组播 public static void GroupSend(IModel channel, MySqlConnection conn, UserInfo user) { // 输入在那个群组中发布消息 Console.Write("Please enter the receiver: "); string group = Console.ReadLine(); Console.WriteLine("-------------------------------------"); // 输入待发布消息 Console.WriteLine("Please enter the message: "); string content = Console.ReadLine(); Console.WriteLine("-------------------------------------"); // 读取群组中的所有成员 MySqlCommand sendCmd = new MySqlCommand("select * from groupmember_info where groupname=‘" + group + "‘", conn); MySqlDataReader sendReader = sendCmd.ExecuteReader(); List<string> memberList = new List<string>(); while (sendReader.Read()) { memberList.Add(sendReader.GetString("username")); } sendReader.Close(); string sender = user.Username + " " + group; string sendTime = DateTime.Now.ToString(); // 交换机声明 channel.ExchangeDeclare(exchange: "topic_message", type: "topic"); var properties = channel.CreateBasicProperties(); properties.Persistent = true; // 逐个处理成员,在线用户在线发送消息,离线用户离线发送消息 foreach (string member in memberList) { sendCmd = new MySqlCommand("select * from user_info where username=‘" + member + "‘", conn); sendReader = sendCmd.ExecuteReader(); sendReader.Read(); string receiverKey = sendReader.GetString("routingkey"); int hasQueue = sendReader.GetInt32("has_queue"); string message = sender + "," + sendTime + "," + content; var body = Encoding.UTF8.GetBytes(message); sendReader.Close(); // 目标队列存在,则直接发布信息;否则将信息数据存入数据库 if (hasQueue == 1) { // 在线发送消息 channel.BasicPublish(exchange: "topic_message", routingKey: receiverKey, basicProperties: properties, body: body); } else { // 离线发送消息 sendCmd = new MySqlCommand("insert into message_info set [email protected],[email protected],[email protected],[email protected]", conn); sendCmd.Parameters.AddWithValue("st", sendTime); sendCmd.Parameters.AddWithValue("body", content); sendCmd.Parameters.AddWithValue("re", member); sendCmd.Parameters.AddWithValue("se", sender); sendCmd.ExecuteNonQuery(); } } Console.WriteLine("You sent [ {0} ] to {1} at {2}.", content, group, sendTime); } #endregion } }
UserLogout.cs:
using System; using RabbitMQ.Client; using MySql.Data.MySqlClient; namespace Client { class UserLogout { public static void Logout(IModel channel, MySqlConnection conn, UserInfo user) { Console.WriteLine("Goodbye, {0}!", user.Username); // 删除队列,并更新数据库,表明后面发送的消息应该转为离线发送 channel.QueueDelete(user.RoutingKey); MySqlCommand cmd = new MySqlCommand("update user_info set has_queue=0 where [email protected]", conn); cmd.Parameters.AddWithValue("uid", user.Username); cmd.ExecuteNonQuery(); } } }
百度云链接:https://pan.baidu.com/s/1Y93rcqnsv1cA9ZIxH2xrBw 密码:zfc5
以上是关于利用RabbitMQMySQL实现超大用户级别的消息在/离线收发的主要内容,如果未能解决你的问题,请参考以下文章