Redis应用----消息传递
Posted 那一叶随风
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Redis应用----消息传递相关的知识,希望对你有一定的参考价值。
1、摘要
消息传递这一应用广泛存在于各个网站中,这个功能也是一个网站必不可少的。常见的消息传递应用有,新浪微博中的@我呀、给你评论然后的提示呀、赞赞赞提示、私信呀、甚至是发微博分享的新鲜事;知乎中的私信呀、live发送过来的消息、知乎团队消息呀等等。
2、实现方法
消息传递即两个或者多个客户端在相互发送和接收消息。
通常有两种方法实现:
第一种为消息推送。Redis内置有这种机制,publish往频道推送消息、subscribe订阅频道。这种方法有一个缺点就是必须保证接收者时刻在线(即是此时程序不能停下来,一直保持监控状态,假若断线后就会出现客户端丢失信息)
第二种为消息拉取。所谓消息拉取,就是客户端自主去获取存储在服务器中的数据。Redis内部没有实现消息拉取这种机制。因此我们需要自己手动编写代码去实现这个功能。
在这里我们,我们进一步将消息传递再细分为一对一的消息传递,多对多的消息传递(群组消息传递)。
【注:两个类的代码相对较多,因此将其折叠起来了】
3、一对一消息传递
例子1:一对一消息发送与获取
模块要求:
1、提示有多少个联系人发来新消息
2、信息包含发送人、时间、信息内容
3、能够获取之前的旧消息
4、并且消息能够保持7天,过期将会被动触发删除
Redis实现思路:
1、新消息与旧消息分别采用两个链表来存储
2、原始消息的结构采用数组的形式存放,并且含有发送人、时间戳、信息内容
3、在推入redis的链表前,需要将数据转换为json类型然后再进行存储
4、在取出新信息时应该使用rpoplpush来实现,将已读的新消息推入旧消息链表中
5、取出旧消息时,应该用旧消息的时间与现在的时间进行对比,若超时,则直接删除后面的全部数据(因为数据是按时间一个一个压进链表中的,所以对于时间是有序排列的)
数据存储结构图:
php的实现代码:
#SinglePullMessage.class.php
1 <?php 2 #单接接收者接收消息 3 class SinglePullMessage 4 { 5 private $redis=\'\'; #存储redis对象 6 /** 7 * @desc 构造函数 8 * 9 * @param $host string | redis主机 10 * @param $port int | 端口 11 */ 12 public function __construct($host,$port=6379) 13 { 14 $this->redis=new Redis(); 15 $this->redis->connect($host,$port); 16 } 17 18 /** 19 * @desc 发送消息(一个人) 20 * 21 * @param $toUser string | 接收人 22 * @param $messageArr array | 发送的消息数组,包含sender、message、time 23 * 24 * @return bool 25 */ 26 public function sendSingle($toUser,$messageArr) 27 { 28 $json_message=json_encode($messageArr); #编码成json数据 29 return $this->redis->lpush($toUser,$json_message); #将数据推入链表 30 } 31 32 /** 33 * @desc 用户获取新消息 34 * 35 * @param $user string | 用户名 36 * 37 * @return array 返回数组,包含多少个用户发来新消息,以及具体消息 38 */ 39 public function getNewMessage($user) 40 { 41 #接收新信息数据,并且将数据推入旧信息数据链表中,并且在原链表中删除 42 $messageArr=array(); 43 while($json_message=$this->redis->rpoplpush($user, \'preMessage_\'.$user)) 44 { 45 $temp=json_decode($json_message); #将json数据变成对象 46 $messageArr[$temp->sender][]=$temp; #转换成数组信息 47 } 48 if($messageArr) 49 { 50 $arr[\'count\']=count($messageArr); #统计有多少个用户发来信息 51 $arr[\'messageArr\']=$messageArr; 52 return $arr; 53 } 54 return false; 55 } 56 57 public function getPreMessage($user) 58 { 59 ##取出旧消息 60 $messageArr=array(); 61 $json_pre=$this->redis->lrange(\'preMessage_\'.$user, 0, -1); #一次性将全部旧消息取出来 62 foreach ($json_pre as $k => $v) 63 { 64 $temp=json_decode($v); #json反编码 65 $timeout=$temp->time+60*60*24*7; #数据过期时间 七天过期 66 if($timeout<time()) #判断数据是否过期 67 { 68 if($k==0) #若是最迟插入的数据都过期了,则将所有数据删除 69 { 70 $this->redis->del(\'preMessage_\'.$user); 71 break; 72 } 73 $this->redis->ltrim(\'preMessage_\'.$user, 0, $k); #若检测出有过期的,则将比它之前插入的所有数据删除 74 break; 75 } 76 $messageArr[$temp->sender][]=$temp; 77 } 78 return $messageArr; 79 } 80 81 /** 82 * @desc 消息处理,没什么特别的作用。在这里这是用来处理数组信息,然后将其输出。 83 * 84 * @param $arr array | 需要处理的信息数组 85 * 86 * @return 返回打印输出 87 */ 88 public function dealArr($arr) 89 { 90 foreach ($arr as $k => $v) 91 { 92 foreach ($v as $k1 => $v2) 93 { 94 echo \'发送人:\'.$v2->sender.\' 发送时间:\'.date(\'Y-m-d h:i:s\',$v2->time).\'<br/>\'; 95 echo \'消息内容:\'.$v2->message.\'<br/>\'; 96 } 97 echo "<hr/>"; 98 } 99 } 100 101 102 }
测试:
1、发送消息
#建立test1.php
1 include \'./SinglePullMessage.class.php\'; 2 $object=new SinglePullMessage(\'192.168.95.11\'); 3 #发送消息 4 $sender=\'boss\'; #发送者 5 $to=\'jane\'; #接收者 6 $message=\'How are you\'; #信息 7 $time=time(); 8 $arr=array(\'sender\'=>$sender,\'message\'=>$message,\'time\'=>$time); 9 echo $object->sendSingle($to,$arr);
2、获取新消息
#建立test2.php
1 include \'./SinglePullMessage.class.php\'; 2 $object=new SinglePullMessage(\'192.168.95.11\'); 3 #获取新消息 4 $arr=$object->getNewMessage(\'jane\'); 5 if($arr) 6 { 7 echo $arr[\'count\']."个联系人发来新消息<br/><hr/>"; 8 $object->dealArr($arr[\'messageArr\']); 9 } 10 else 11 echo "无新消息";
访问结果:
3、获取旧消息
#建立test3.php
1 include \'./SinglePullMessage.class.php\'; 2 $object=new SinglePullMessage(\'192.168.95.11\'); 3 #获取旧消息 4 $arr=$object->getPreMessage(\'jane\'); 5 if($arr) 6 { 7 $object->dealArr($arr); 8 } 9 else 10 echo "无旧数据";
4、多对多消息传递
例子2:多对多消息发送与获取(即是群组)
模块要求:
1、用户能够自行创建群组,并成为群主
2、群主可以拉人进来作为群组成员、并且可以踢人
3、用户可以直接退出群组
4、可以发送消息,每一位成员都可以拉取消息
5、群组的消息最大容纳量为5000条
6、成员可以拉取新消息,并提示有多少新消息
7、成员可以分页获取之前已读的旧消息
。。。。。功能就写这几个吧,有需要或者想练习的同学们可以增加其他功能,例如禁言、匿名消息发送、文件发送等等。
Redis实现思路:
1、群组的消息以及群组的成员组成采用有序集合进行存储。群组消息有序集合的member存储用户发送的json数据消息,score存储唯一值,将采用原子操作incr获取string中的自增长值进行存储;群组成员有序集合的member存储user,score存储非零数字(在这里这个score意义不大,我的例子代码中使用数字1为群主的score,其他的存储为2。当然这使用这个数据还可以扩展别的功能,例如群组中成员等级)可参考下面数据存储结构简图。
2、用户所加入的群组也是采用有序集合进行存储。其中,member存储群组ID,score存储用户已经获取该群组的最大消息分值(对应群组消息的score值)
3、用户创建群组的时候,通过原子操作incr从而获取一个唯一ID
4、用户在群中发送消息时,也是通过原子操作incr获取一个唯一自增长有序ID
5、在执行incr时,为防止并发导致竞争关系,因此需要进行加锁操作【redis详细锁的讲解可以参考:Redis构建分布式锁http://www.cnblogs.com/phpstudy2015-6/p/6575775.html】
6、创建群组方法简要思路,任何一个用户都可以创建群组聊天,在创建的同时,可以选择时是否添加群组成员(参数通过数组的形式)。创建过程将会为这个群组建立一个群组成员有序集合(群组信息有序集合暂时不创建),接着将群主添加进去,再将群ID添加用户所参加的群组有序集合中。
数据存储结构图:
PHP的代码实现:
#ManyPullMessage.class.php
1 <?php 2 class ManyPullMessage 3 { 4 private $redis=\'\'; #存储redis对象 5 /** 6 * @desc 构造函数 7 * 8 * @param $host string | redis主机 9 * @param $port int | 端口 10 */ 11 public function __construct($host,$port=6379) 12 { 13 $this->redis=new Redis(); 14 $this->redis->connect($host,$port); 15 } 16 17 /** 18 * @desc 用于创建群组的方法,在创建的同时还可以拉人进群组 19 * 20 * @param $user string | 用户名,创建群组的主人 21 * @param $addUser array | 其他用户构成的数组 22 * 23 * @param $lockName string | 锁的名字,用于获取群组ID的时候用 24 * @return int 返回群组ID 25 */ 26 public function createGroupChat($user, $addUser=array(), $lockName=\'chatIdLock\') 27 { 28 $identifier=$this->getLock($lockName); #获取锁 29 if($identifier) 30 { 31 $id=$this->redis->incr(\'groupChatID\'); #获取群组ID 32 $this->releaseLock($lockName,$identifier); #释放锁 33 } 34 else 35 return false; 36 $messageCount=$this->redis->set(\'countMessage_\'.$id, 0); #初始化这个群组消息计数器 37 #开启非事务型流水线,一次性将所有redis命令传给redis,减少与redis的连接 38 $pipe=$this->redis->pipeline(); 39 $this->redis->zadd(\'groupChat_\'.$id, 1, $user); #创建群组成员有序集合,并添加群主 40 #将这个群组添加到user所参加的群组有序集合中 41 $this->redis->zadd(\'hasGroupChat_\'.$user, 0, $id); 42 foreach ($addUser as $v) #创建群组的同时需要添加的用户成员 43 { 44 $this->redis->zadd(\'groupChat_\'.$id, 2, $v); 45 $this->redis->zadd(\'hasGroupChat_\'.$v, 0, $id); 46 } 47 $pipe->exec(); 48 return $id; #返回群组ID 49 } 50 51 /** 52 * @desc 群主主动拉人进群 53 * 54 * @param $user string | 群主名 55 * @param $groupChatID int | 群组ID 56 * @param $addMembers array | 需要拉进群的用户 57 * 58 * @return bool 59 */ 60 public function addMembers($user, $groupChatID, $addMembers=array()) 61 { 62 $groupMasterScore=$this->redis->zscore(\'groupChat_\'.$groupChatID, $user); #将groupChatName的群主取出来 63 if($groupMasterScore==1) #判断user是否是群主 64 { 65 $pipe=$this->redis->pipeline(); #开启非事务流水线 66 foreach ($addMembers as $v) 67 { 68 $this->redis->zadd(\'groupChat_\'.$groupChatID, 2, $v); #添加进群 69 $this->redis->zadd(\'hasGroupChat_\'.$v, 0, $groupChatID); #添加群名到用户的有序集合中 70 } 71 $pipe->exec(); 72 return true; 73 } 74 return false; 75 } 76 77 /** 78 * @desc 群主删除成员 79 * 80 * @param $user string | 群主名 81 * @param $groupChatID int | 群组ID 82 * @param $delMembers array | 需要删除的成员名字 83 * 84 * @return bool 85 */ 86 public function delMembers($user, $groupChatID, $delMembers=array()) 87 { 88 $groupMasterScore=$this->redis->zscore(\'groupChat_\'.$groupChatID, $user); 89 if($groupMasterScore==1) #判断user是否是群主 90 { 91 $pipe=$this->redis->pipeline(); #开启非事务流水线 92 foreach ($delMembers as $v) 93 { 94 $this->redis->zrem(\'groupChat_\'.$groupChatID, $v); 95 $this->redis->zrem(\'hasGroupChat_\'.$v, $groupChatID); 96 } 97 $pipe->exec(); 98 return true; 99 } 100 return false; 101 } 102 103 /** 104 * @desc 退出群组 105 * 106 * @param $user string | 用户名 107 * @param $groupChatID int | 群组名 108 */ 109 public function quitGroupChat($user, $groupChatID) 110 { 111 $this->redis->zrem(\'groupChat_\'.$groupChatID, $user); 112 $this->redis->zrem(\'hasGroupChat_\'.$user, $groupChatID); 113 return true; 114 } 115 116 /** 117 * @desc 发送消息 118 * 119 * @param $user string | 用户名 120 * @param $groupChatID int | 群组ID 121 * @param $messageArr array | 包含发送消息的数组 122 * @param $preLockName string | 群消息锁前缀,群消息锁全名为countLock_群ID 123 * 124 * @return bool 125 */ 126 public function sendMessage($user, $groupChatID, $messageArr, $preLockName=\'countLock_\') 127 { 128 $memberScore=$this->redis->zscore(\'groupChat_\'.$groupChatID, $user); #成员score 129 if($memberScore) 130 { 131 $identifier=$this->getLock($preLockName.$groupChatID); #获取锁 132 if($identifier) #判断获取锁是否成功 133 { 134 $messageCount=$this->redis->incr(\'countMessage_\'.$groupChatID); 135 $this->releaseLock($preLockName.$groupChatID,$identifier); #释放锁 136 } 137 else 138 return false; 139 $json_message=json_encode($messageArr); 140 $this->redis->zadd(\'groupChatMessage_\'.$groupChatID, $messageCount, $json_message); 141 $count=$this->redis->zcard(\'groupChatMessage_\'.$groupChatID); #查看信息量大小 142 if($count>5000) #判断数据量有没有达到5000条 143 { #数据量超5000,则需要清除旧数据 144 $start=5000-$count; 145 $this->redis->zremrangebyrank(\'groupChatMessage_\'.$groupChatID, $start, $count); 146 } 147 return true; 148 } 149 return false; 150 } 151 152 /** 153 * @desc 获取新信息 154 * 155 * @param $user string | 用户名 156 * 157 * @return 成功则放回json数据数组,无新信息返回false 158 */ 159 public function getNewMessage($user) 160 { 161 $arrID=$this->redis->zrange(\'hasGroupChat_\'.$user, 0, -1, \'withscores\'); #获取用户拥有的群组ID 162 $json_message=array(); #初始化 163 foreach ($arrID as $k => $v) #遍历循环所有群组,查看是否有新消息 164 { 165 $messageCount=$this->redis->get(\'countMessage_\'.$k); #群组最大信息分值数 9.使用Redis进行消息传递