利用RabbitMQMySQL实现超大用户级别的消息在/离线收发

Posted 穷酸秀才大艹包

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了利用RabbitMQMySQL实现超大用户级别的消息在/离线收发相关的知识,希望对你有一定的参考价值。

由于RabbitMQ中只有队列(queue)才能存储信息,所以用RabbitMQ实现超大用户级别(百万计)的消息在/离线收发需要对每一个用户创建一个永久队列。

但是RabbitMQ节点内存有限,经测试后发现节点集群也无法满足数百万用户队列收发数据的要求,所以最终决定采用数据库辅助实现该功能。

 

一、数据库结构

user_list数据库下有4张表:user_info、group_info、groupmember_info、message_info。

user_info表中含有username(主键,非空,VARCHAR(50))、password(非空,VARCHAR(50))、routingkey(非空,VARCHAR(50))、has_queue(INT UNSIGNED)四个字段。

group_info表中含有groupname(主键,非空,VARCHAR(50))、password(非空,VARCHAR(50))、creator(非空,VARCHAR(50))三个字段。

groupmember_info表中含有username(主键,非空,VARCHAR(50))、groupname(主键,非空,VARCHAR(50))两个字段。

message_info表中含有sendtime(非空,VARCHAR(50))、body(非空,VARCHAR(300)),receiver(非空,VARCHAR(50))、sender(非空,VARCHAR(100))四个字段。

 

二、客户端结构

1、文件夹创建以及包依赖安装:

dotnet new console --name Client
mv Client/Program.cs Client/Client.cs
cd Client
dotnet add package RabbitMQ.Client
dotnet add package mysql.Data dotnet restore

2、项目结构

Client.cs(主程序):
技术分享图片
using RabbitMQ.Client;
using MySql.Data.MySqlClient;

namespace Client
{
    class Client
    {
        static void Main(string[] args)
        {
            // 连接数据库
            string connStr = "Database=user_list;datasource=127.0.0.1;port=3306;user=root;pwd=123456;";
            MySqlConnection sqlConn = new MySqlConnection(connStr);

            sqlConn.Open();

            // 连接RabbitMQ
            var factory = new ConnectionFactory() { HostName = "dev.corp.wingoht.com", VirtualHost = "cd", UserName = "ishowfun", Password = "123456" };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                // 用户登录,并获取离线消息
                UserInfo user = UserLogin.Login(sqlConn);

                // 监听在线消息
                Consumer.StartListening(channel, sqlConn, user);

                // 监听按钮事件,调用不同的客户端功能
                KeyListening.StartListening(channel, sqlConn, user);

                // 退出登录
                UserLogout.Logout(channel, sqlConn, user);
            }

            // 释放数据库连接
            sqlConn.Close();
        }
    }
}
View Code

 UserLogin.cs:

技术分享图片
using System;
using MySql.Data.MySqlClient;

namespace Client
{
    class UserLogin
    {
        public static UserInfo Login(MySqlConnection conn)
        {
            UserInfo user = new UserInfo();
            
            // 用户名输入
            Console.Write("Please enter your username: ");
            user.Username = Console.ReadLine();
            while (user.Username.Contains(",") || user.Username.Contains(" "))
            {
                Console.WriteLine("Error: Username can not contain ","! and " "");
                Console.Write("Please enter your username again: ");
                user.Username = Console.ReadLine();
            }

            MySqlCommand cmd = new MySqlCommand("select * from user_info where username=‘" + user.Username + "", conn);
            MySqlDataReader reader = cmd.ExecuteReader();

            // 判断该用户是否已经注册
            if (reader.Read())
            {
                // 已注册用户登录
                user.IsNewUser = true;

                // 验证密码
                Console.Write("Please enter your password: ");
                user.Password = reader.GetString("password");
                while (user.Password != Console.ReadLine())
                {
                    Console.WriteLine("Error: Username and password do not match!");
                    Console.Write("Please enter your password again: ");
                }
                Console.WriteLine("Welcome back, {0}!", user.Username);
                Console.WriteLine("-------------------------------------");

                // 获取当前用户的路由键
                user.RoutingKey = reader.GetString("routingkey");
                reader.Close();

                // 读取离线消息
                cmd = new MySqlCommand("select * from message_info where receiver=‘" + user.Username + "", conn);
                reader = cmd.ExecuteReader();
                Console.WriteLine("Unread Message: ");
                while (reader.Read())
                {
                    string sender = reader.GetString("sender");
                    string sendTime = reader.GetString("sendtime");
                    string content = reader.GetString("body");
                    string[] splitSender = sender.Split(" ");
                    if (splitSender.Length == 1)
                    {
                        Console.WriteLine("Sender: {0}, SendTime: {1}, Content: {2}", sender, sendTime, content);
                    }
                    else
                    {
                        sender = splitSender[0];
                        string group = splitSender[1];
                        Console.WriteLine("Sender: {0} from {1}, SendTime: {2}, Content: {3}", sender, group, sendTime, content);
                    }
                }
                reader.Close();

                // 删除已处理的离线消息
                cmd = new MySqlCommand("delete from message_info where [email protected]", conn);
                cmd.Parameters.AddWithValue("re", user.Username);

                cmd.ExecuteNonQuery();
            }
            else
            {
                // 新用户注册并登录
                user.IsNewUser = false;
                reader.Close();
                Console.WriteLine("Welcome, new user!");

                // 设置密码
                Console.Write("Please set your password: ");
                user.Password = Console.ReadLine();
                Console.Write("Please confirm your password: ");
                while (user.Password != Console.ReadLine())
                {
                    Console.WriteLine("Error: Confirmation failure!");
                    Console.Write("Please set your password again: ");
                    user.Password = Console.ReadLine();
                    Console.Write("Please confirm your password: ");
                }

                // 生成该用户的加密路由键,并保证该路由键唯一
                user.RoutingKey = GenerateKey.GenerateRandomString(32);
                cmd = new MySqlCommand("select * from user_info where routingkey=‘" + user.RoutingKey + "", conn);
                reader = cmd.ExecuteReader();
                while (reader.Read())
                {
                    reader.Close();
                    user.RoutingKey = GenerateKey.GenerateRandomString(32);
                    reader = cmd.ExecuteReader();
                }

                reader.Close();
            }

            Console.WriteLine("-------------------------------------");
            return user;
        }
    }
}
View Code

 UserInfo.cs:

技术分享图片
namespace Client
{
    class UserInfo
    {
        // 唯一用户名
        public string Username { get; set; }

        // 登录密码
        public string Password { get; set; }

        // 加密路由键
        public string RoutingKey { get; set; }

        // 是否为新用户
        public bool IsNewUser { get; set; }
    }
}
View Code

 GenerateKey.cs: 

技术分享图片
using System;

namespace Client
{
    class GenerateKey
    {
        // 字符串中字符的取值范围
        private static char[] constant =
        {
            0,1,2,3,4,5,6,7,8,9,
            a,b,c,d,e,f,g,h,i,j,k,l,m,n,o,p,q,r,s,t,u,v,w,x,y,z,
            A,B,C,D,E,F,G,H,I,J,K,L,M,N,O,P,Q,R,S,T,U,V,W,X,Y,Z
        };

        // 生成指定长度的随机字符串
        public static string GenerateRandomString(int len)
        {
            System.Text.StringBuilder newRandom = new System.Text.StringBuilder(62);
            Random rd = new Random();
            for (int i = 0; i < len; i++)
            {
                newRandom.Append(constant[rd.Next(62)]);
            }
            return newRandom.ToString();
        }
    }
}
View Code

 Consumer.cs:

技术分享图片
using System;
using System.Text;

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using MySql.Data.MySqlClient;

namespace Client
{
    class Consumer
    {
        public static void StartListening(IModel channel, MySqlConnection conn, UserInfo user)
        {
            // 交换机声明
            channel.ExchangeDeclare(exchange: "topic_message", type: "topic");

            // 队列创建与绑定
            channel.QueueDeclare(queue: user.RoutingKey,
                                 durable: true,
                                 exclusive: false,
                                 autoDelete: false,
                                 arguments: null);

            channel.QueueBind(queue: user.RoutingKey,
                              exchange: "topic_message",
                              routingKey: user.RoutingKey);

            var consumer = new EventingBasicConsumer(channel);

            // 处理收到的消息
            consumer.Received += (model, ea) =>
            {
                var body = ea.Body;
                string message = Encoding.UTF8.GetString(body);
                string[] splitRes = message.Trim().Split(",");
                string sendTime = splitRes[1];
                string content = splitRes[2];
                string[] splitSender = splitRes[0].Split(" ");
                if (splitRes.Length > 3)
                {
                    for (int i = 3; i < splitRes.Length; i++)
                        content = content + "," + splitRes[i];
                }

                Console.WriteLine();
                if (splitSender.Length == 1)
                {
                    string sender = splitRes[0];
                    Console.WriteLine("Sender: {0}, SendTime: {1}, Content: {2}", sender, sendTime, content);
                }
                else
                {
                    string sender = splitSender[0];
                    string group = splitSender[1];
                    Console.WriteLine("Sender: {0} from {1}, SendTime: {2}, Content: {3}", sender, group, sendTime, content);
                }
            };

            channel.BasicConsume(queue: user.RoutingKey,
                                 autoAck: true,
                                 consumer: consumer);

            MySqlCommand cmd;

            // 更新数据库
            if (user.IsNewUser)
            {
                // 向数据库声明该用户已经拥有了队列,可以直接在线发送
                cmd = new MySqlCommand("update user_info set has_queue=1 where [email protected]", conn);
                cmd.Parameters.AddWithValue("uid", user.Username);

                cmd.ExecuteNonQuery();
            }
            else
            {
                // 防止注入地插入新用户数据
                cmd = new MySqlCommand("insert into user_info set [email protected],[email protected],[email protected],has_queue=1", conn);
                cmd.Parameters.AddWithValue("uid", user.Username);
                cmd.Parameters.AddWithValue("pwd", user.Password);
                cmd.Parameters.AddWithValue("rk", user.RoutingKey);

                cmd.ExecuteNonQuery();
            }

            Console.WriteLine("{0} Start Listening!", user.Username);
            Console.WriteLine("-------------------------------------");
        }
    }
}
View Code

 KeyListening.cs:

技术分享图片
using System;

using RabbitMQ.Client;
using MySql.Data.MySqlClient;

namespace Client
{
    class KeyListening
    {
        public static void StartListening(IModel channel, MySqlConnection conn, UserInfo user)
        {
            MQHelper.HelpTip();
            Console.WriteLine("-------------------------------------");
            ConsoleKeyInfo keyPressed;
            bool escFlag = false;

            while (true)
            {

                keyPressed = Console.ReadKey();
                Console.WriteLine();
                Console.WriteLine("-------------------------------------");
                switch (keyPressed.Key)
                {
                    // 0: 显示帮助信息
                    case ConsoleKey.D0:
                        MQHelper.HelpTip();
                        break;
                    // 1: 创建群组
                    case ConsoleKey.D1:
                        MQHelper.CreateGroup(conn, user);
                        break;
                    // 2: 申请加入群组
                    case ConsoleKey.D2:
                        MQHelper.JoinGroup(conn, user);
                        break;
                    // 3: 退出群组
                    case ConsoleKey.D3:
                        MQHelper.LeaveGroup(conn, user);
                        break;
                    // 4: 显示群组信息    
                    case ConsoleKey.D4:
                        MQHelper.ShowGroup(conn, user);
                        break;
                    // 5: 单播
                    case ConsoleKey.D5:
                        MQHelper.BasicSend(channel, conn, user);
                        break;
                    // 6: 组播
                    case ConsoleKey.D6:
                        MQHelper.GroupSend(channel, conn, user);
                        break;
                    // ESC: 退出
                    case ConsoleKey.Escape:
                        escFlag = true;
                        break;
                    // 无意义按键
                    default:
                        Console.WriteLine("Error: Invalid press!");
                        break;
                }

                Console.WriteLine("-------------------------------------");

                if (escFlag)
                    break;
            }
        }
    }
}
View Code

 MQHelper.cs:

技术分享图片
using System;
using System.Text;
using System.Collections.Generic;

using RabbitMQ.Client;
using MySql.Data.MySqlClient;

namespace Client
{
    // 该类包含了按钮监听事件触发的各项功能的实现
    class MQHelper
    {
        #region 显示帮助信息
        public static void HelpTip()
        {
            // 显示各按键对应功能
            Console.WriteLine("Function Press:");
            Console.WriteLine("[0]: Help");
            Console.WriteLine("[1]: Create group");
            Console.WriteLine("[2]: Join group");
            Console.WriteLine("[3]: Leave group");
            Console.WriteLine("[4]: Show groups");
            Console.WriteLine("[5]: BasicSend");
            Console.WriteLine("[6]: GroupSend");
            Console.WriteLine("[ESC]: Log out");
        }
        #endregion

        #region 创建群组
        public static void CreateGroup(MySqlConnection conn, UserInfo user)
        {
            // 设置群组名称以及加入密码
            Console.Write("Please set the group name: ");
            string groupName = Console.ReadLine();
            MySqlCommand groupCmd = new MySqlCommand("select * from group_info where groupname=‘" + groupName + "", conn);
            MySqlDataReader groupReader = groupCmd.ExecuteReader();

            if (groupReader.Read())
            {
                groupReader.Close();
                Console.WriteLine("Error: This group name already exists!");
            }
            else
            {
                groupReader.Close();

                Console.Write("Please set the password: ");
                string pwd = Console.ReadLine();
                Console.Write("Please confirm the password: ");
                while (pwd != Console.ReadLine())
                {
                    Console.WriteLine("Error: Confirmation failure!");
                    Console.Write("Please set the password again: ");
                    pwd = Console.ReadLine();
                    Console.Write("Please confirm the password: ");
                }

                // 将群组信息插入数据库
                groupCmd = new MySqlCommand("insert into group_info set [email protected],[email protected],[email protected]", conn);
                groupCmd.Parameters.AddWithValue("gid", groupName);
                groupCmd.Parameters.AddWithValue("pwd", pwd);
                groupCmd.Parameters.AddWithValue("cr", user.Username);

                groupCmd.ExecuteNonQuery();

                groupCmd = new MySqlCommand("insert into groupmember_info set [email protected],[email protected]", conn);
                groupCmd.Parameters.AddWithValue("gid", groupName);
                groupCmd.Parameters.AddWithValue("uid", user.Username);

                groupCmd.ExecuteNonQuery();

                Console.WriteLine("Successfully create the group!");
            }
        }
        #endregion

        #region 申请加入群组
        public static void JoinGroup(MySqlConnection conn, UserInfo user)
        {
            // 输入要加入的群名
            Console.Write("Please enter the group name: ");
            string groupName = Console.ReadLine();
            MySqlCommand groupCmd = new MySqlCommand("select * from group_info where groupname=‘" + groupName + "", conn);
            MySqlDataReader groupReader = groupCmd.ExecuteReader();

            // 判断该群是否存在
            if (groupReader.Read())
            {
                // 验证加入密码
                string pwd = groupReader.GetString("password");
                groupReader.Close();

                Console.Write("Please enter your password: ");
                if (pwd != Console.ReadLine())
                {
                    Console.WriteLine("Error: Username and password do not match!");
                }
                else
                {
                    groupCmd = new MySqlCommand("select * from groupmember_info where groupname=‘" + groupName + "‘ and username=‘" + user.Username + "", conn);
                    groupReader = groupCmd.ExecuteReader();
                    
                    // 判断该用户是否已经在这个群中 
                    if (groupReader.Read())
                    {
                        groupReader.Close();
                        Console.WriteLine("Error: You already join the group!");
                    }
                    else
                    {
                        // 加入该群,并更新数据库信息
                        groupReader.Close();
                        groupCmd = new MySqlCommand("insert into groupmember_info set [email protected],[email protected]", conn);
                        groupCmd.Parameters.AddWithValue("gid", groupName);
                        groupCmd.Parameters.AddWithValue("uid", user.Username);

                        groupCmd.ExecuteNonQuery();
                        Console.WriteLine("Successfully join the group!");
                    }
                }
            }
            else
            {
                groupReader.Close();
                Console.WriteLine("Error: This group name does not exist!");
            }
        }
        #endregion

        #region 退出群组
        public static void LeaveGroup(MySqlConnection conn, UserInfo user)
        {
            // 输入要退出群的名称
            Console.Write("Please enter the group name: ");
            string groupName = Console.ReadLine();
            MySqlCommand groupCmd = new MySqlCommand("select * from groupmember_info where groupname=‘" + groupName + "‘ and username=‘" + user.Username + "", conn);
            MySqlDataReader groupReader = groupCmd.ExecuteReader();

            // 判断你是否在这个群中
            if (groupReader.Read())
            {
                // 退出该群,并更新数据库信息
                groupReader.Close();
                groupCmd = new MySqlCommand("delete from groupmember_info where [email protected] and [email protected]", conn);
                groupCmd.Parameters.AddWithValue("gid", groupName);
                groupCmd.Parameters.AddWithValue("uid", user.Username);
                groupCmd.ExecuteNonQuery();
                Console.WriteLine("Successfully leave the group!");
            }
            else
            {
                groupReader.Close();
                Console.WriteLine("Error: You didn‘t join the group!");
            }
        }
        #endregion

        #region 显示群组信息
        public static void ShowGroup(MySqlConnection conn, UserInfo user)
        {
            MySqlCommand groupCmd = new MySqlCommand("select * from groupmember_info where username=‘" + user.Username + "", conn);
            MySqlDataReader groupReader = groupCmd.ExecuteReader();

            // 显示加入的所有群名称
            while (groupReader.Read())
            {
                Console.WriteLine(groupReader.GetString("groupname"));
            }
            groupReader.Close();
        }
        #endregion

        #region 单播
        public static void BasicSend(IModel channel, MySqlConnection conn, UserInfo user)
        {
            // 输入收信人用户名
            Console.Write("Please enter the receiver: ");
            string receiver = Console.ReadLine();
            Console.WriteLine("-------------------------------------");

            // 输入待发送的消息
            Console.WriteLine("Please enter the message: ");
            string content = Console.ReadLine();
            Console.WriteLine("-------------------------------------");
            MySqlCommand sendCmd = new MySqlCommand("select * from user_info where username=‘" + receiver + "", conn);
            MySqlDataReader sendReader = sendCmd.ExecuteReader();

            bool user_flag = sendReader.Read();

            // 判断是否存在该收信人
            if (user_flag)
            {
                string receiverKey = sendReader.GetString("routingkey");
                int hasQueue = sendReader.GetInt32("has_queue");
                string sendTime = DateTime.Now.ToString();
                string message = user.Username + "," + sendTime + "," + content;
                var body = Encoding.UTF8.GetBytes(message);

                sendReader.Close();

                // 目标队列存在,则直接发布信息;否则将信息数据存入数据库
                if (hasQueue == 1)
                {
                    // 在线发布消息
                    var properties = channel.CreateBasicProperties();
                    properties.Persistent = true;

                    channel.BasicPublish(exchange: "topic_message",
                                         routingKey: receiverKey,
                                         basicProperties: properties,
                                         body: body);
                }
                else
                {
                    // 离线发布消息
                    sendCmd = new MySqlCommand("insert into message_info set [email protected],[email protected],[email protected],[email protected]", conn);
                    sendCmd.Parameters.AddWithValue("st", sendTime);
                    sendCmd.Parameters.AddWithValue("body", content);
                    sendCmd.Parameters.AddWithValue("re", receiver);
                    sendCmd.Parameters.AddWithValue("se", user.Username);

                    sendCmd.ExecuteNonQuery();
                }
                Console.WriteLine("You sent [ {0} ] to {1} at {2}.", content, receiver, sendTime);
            }
            else
            {
                sendReader.Close();
                Console.WriteLine("Error: The receiver does not exist!");
            }
        }
        #endregion

        #region 组播
        public static void GroupSend(IModel channel, MySqlConnection conn, UserInfo user)
        {
            // 输入在那个群组中发布消息
            Console.Write("Please enter the receiver: ");
            string group = Console.ReadLine();
            Console.WriteLine("-------------------------------------");

            // 输入待发布消息
            Console.WriteLine("Please enter the message: ");
            string content = Console.ReadLine();
            Console.WriteLine("-------------------------------------");

            // 读取群组中的所有成员
            MySqlCommand sendCmd = new MySqlCommand("select * from groupmember_info where groupname=‘" + group + "", conn);
            MySqlDataReader sendReader = sendCmd.ExecuteReader();
            List<string> memberList = new List<string>();
            while (sendReader.Read())
            {
                memberList.Add(sendReader.GetString("username"));
            }
            sendReader.Close();

            string sender = user.Username + " " + group;
            string sendTime = DateTime.Now.ToString();

            // 交换机声明           
            channel.ExchangeDeclare(exchange: "topic_message", type: "topic");

            var properties = channel.CreateBasicProperties();
            properties.Persistent = true;

            // 逐个处理成员,在线用户在线发送消息,离线用户离线发送消息
            foreach (string member in memberList)
            {
                sendCmd = new MySqlCommand("select * from user_info where username=‘" + member + "", conn);
                sendReader = sendCmd.ExecuteReader();

                sendReader.Read();
                string receiverKey = sendReader.GetString("routingkey");
                int hasQueue = sendReader.GetInt32("has_queue");
                string message = sender + "," + sendTime + "," + content;
                var body = Encoding.UTF8.GetBytes(message);
                sendReader.Close();

                // 目标队列存在,则直接发布信息;否则将信息数据存入数据库
                if (hasQueue == 1)
                {
                    // 在线发送消息
                    channel.BasicPublish(exchange: "topic_message",
                                         routingKey: receiverKey,
                                         basicProperties: properties,
                                         body: body);
                }
                else
                {
                    // 离线发送消息
                    sendCmd = new MySqlCommand("insert into message_info set [email protected],[email protected],[email protected],[email protected]", conn);
                    sendCmd.Parameters.AddWithValue("st", sendTime);
                    sendCmd.Parameters.AddWithValue("body", content);
                    sendCmd.Parameters.AddWithValue("re", member);
                    sendCmd.Parameters.AddWithValue("se", sender);

                    sendCmd.ExecuteNonQuery();
                }
            }

            Console.WriteLine("You sent [ {0} ] to {1} at {2}.", content, group, sendTime);
        }
        #endregion
    }
}
View Code

 UserLogout.cs:

技术分享图片
using System;

using RabbitMQ.Client;
using MySql.Data.MySqlClient;

namespace Client
{
    class UserLogout
    {
        public static void Logout(IModel channel, MySqlConnection conn, UserInfo user)
        {
            Console.WriteLine("Goodbye, {0}!", user.Username);

            // 删除队列,并更新数据库,表明后面发送的消息应该转为离线发送
            channel.QueueDelete(user.RoutingKey);
            MySqlCommand cmd = new MySqlCommand("update user_info set has_queue=0 where [email protected]", conn);
            cmd.Parameters.AddWithValue("uid", user.Username);

            cmd.ExecuteNonQuery();
        }
    }
}
View Code

 

百度云链接:https://pan.baidu.com/s/1Y93rcqnsv1cA9ZIxH2xrBw 密码:zfc5








以上是关于利用RabbitMQMySQL实现超大用户级别的消息在/离线收发的主要内容,如果未能解决你的问题,请参考以下文章

如何利用Flink实现超大规模用户行为分析

利用webuploader实现超大文件分片上传断点续传

教你用Flink实现超大规模用户行为分析(附代码视频教程)

jsp利用webuploader实现超大文件分片上传断点续传

求超大文件上传方案( jsp )

JavaScript 超大文件上传解决方案:分片断点上传