PHP消息队列实现及应用
Posted qiusanqi
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了PHP消息队列实现及应用相关的知识,希望对你有一定的参考价值。
概要
- 消息队列的概念,原理和场景
- 解耦案例: 队列处理订单系统和配送系统
- 流量削峰案例: Redis的List类型实现秒杀
- RabbitMQ: 更专业的消息系统实现方案
一.消息队列
- 消息队列概念
- 队列结构中间件
- 消息放入后,不需要立即处理
- 由订阅者/消费者按顺序处理
- 核心结构
- 业务系统--入队-->消息队列--出队-->队列处理系统
- 应用场景
- 冗余: 数据需要冗余时,如订单系统后续需要严格的转换和记录,消息队列将这些数据持久化的存储在队列中,消息处理程序获取消息,并将该条记录删除
- 解耦: 解决了两套系统深度耦合问题,入队系统和出队系统无关
- 流量削峰: 秒杀和抢购,出现非常明显的流量剧增,大量的需求集中在几秒钟之内,对服务器的瞬间压力非常大
- 异步通信: 消息本身就可以使入队的请求直接返回,所以事先了程序的异步操作
- 扩展性: 如订单,订单入队后会有财务系统进行处理,后期如果添加配货系统,配货系统直接订阅该消息队列即可
- 排序保证: 有些场景下数据的处理顺序非常重要,队列可做成单线程单进单出系统,保证数据按照顺序处理
- 常见队列实现优缺点
- mysql: 可靠性高,易实现,速度慢
- Redis: 速度快,单条大消息包时效率低
- 消息系统: 专业性强,可靠,学习成本高
- 消息处理的触发机制
- 死循环方式读取: 易实现,故障时无法及时恢复(时效性强,适合秒杀)
- 定时任务: 压力均分,有处理量上限(定时任务的间隔和数量需要把控,适合订单系统,物流配货系统)
- 守护进程: 类似于php-FPM和PHP-CG,需要shell基础(监听进程检测消息队列中是否有内容,有内容启用出队系统进行处理)
二.解耦案例:队列处理订单系统和配送系统
- 架构设计
- 订单系统(接收用户订单)--->
- 订单队列表--->
- 配送系统中定时执行的程序读取队列表,将已处理的记录进行标记
- 程序流程
- 接收用户订单(Order.php)--->
- 订单系统处理--->
- 队列表(包含字符:order_id,status,mobile,address,created_at,updated_at)--->
- (定时脚本goods.sh每分钟启动)配送处理系统(Goods.php)--->
- 配送系统处理
- 代码实现
1 -- 表结构 2 create table `order_queue`( 3 `id` int(11) unsigned NOT NULL AUTO_INCREMENT COMMENT ‘id号‘, 4 `order_id` int(11) NOT NULL, 5 `mobile` varchar(20) NOT NULL COMMENT ‘用户手机号‘, 6 `address` varchar(100) NOT NULL COMMENT ‘用户地址‘, 7 `created_at` datetime NOT NULL DEFAULT ‘0000-00-00 00:00:00‘ COMMENT ‘订单创建的时间‘, 8 `updated_at` datetime NOT NULL DEFAULT ‘0000-00-00 00:00:00‘ COMMENT ‘处理完成的时间‘, 9 `status` tinyint(2) NOT NULL COMMENT ‘当前状态,0未处理,1已处理,2处理中‘, 10 PRIMARY key (`id`) 11 )ENGINE=InnoDB DEFAULT CHARSET=utf8;
1 //db.php 2 <?php 3 // 数据库连接类 4 class DB{ 5 //私有的属性 6 private static $dbcon=false; 7 private $host; 8 private $port; 9 private $user; 10 private $pass; 11 private $db; 12 private $charset; 13 private $link; 14 //私有的构造方法 15 private function __construct(){ 16 $this->host = ‘localhost‘; 17 $this->port = ‘3306‘; 18 $this->user = ‘root‘; 19 $this->pass = ‘root‘; 20 $this->db = ‘imooc‘; 21 $this->charset= ‘utf8‘; 22 //连接数据库 23 $this->db_connect(); 24 //选择数据库 25 $this->db_usedb(); 26 //设置字符集 27 $this->db_charset(); 28 } 29 //连接数据库 30 private function db_connect(){ 31 $this->link=mysqli_connect($this->host.‘:‘.$this->port,$this->user,$this->pass); 32 if(!$this->link){ 33 echo "数据库连接失败<br>"; 34 echo "错误编码".mysqli_errno($this->link)."<br>"; 35 echo "错误信息".mysqli_error($this->link)."<br>"; 36 exit; 37 } 38 } 39 //设置字符集 40 private function db_charset(){ 41 mysqli_query($this->link,"set names {$this->charset}"); 42 } 43 //选择数据库 44 private function db_usedb(){ 45 mysqli_query($this->link,"use {$this->db}"); 46 } 47 //私有的克隆(实现单例) 48 private function __clone(){ 49 die(‘clone is not allowed‘); 50 } 51 //公用的静态方法(将该方法作为入口) 52 public static function getIntance(){ 53 if(self::$dbcon==false){ 54 self::$dbcon=new self; 55 } 56 return self::$dbcon; 57 } 58 //执行sql语句的方法 59 public function query($sql){ 60 $res=mysqli_query($this->link,$sql); 61 if(!$res){ 62 echo "sql语句执行失败<br>"; 63 echo "错误编码是".mysqli_errno($this->link)."<br>"; 64 echo "错误信息是".mysqli_error($this->link)."<br>"; 65 } 66 return $res; 67 } 68 //获得最后一条记录id 69 public function getInsertid(){ 70 return mysqli_insert_id($this->link); 71 } 72 /** 73 * 查询某个字段 74 * @param 75 * @return string or int 76 */ 77 public function getOne($sql){ 78 $query=$this->query($sql); 79 return mysqli_free_result($query); 80 } 81 //获取一行记录,return array 一维数组 82 public function getRow($sql,$type="assoc"){ 83 $query=$this->query($sql); 84 if(!in_array($type,array("assoc",‘array‘,"row"))){ 85 die("mysqli_query error"); 86 } 87 $funcname="mysqli_fetch_".$type; 88 return $funcname($query); 89 } 90 //获取一条记录,前置条件通过资源获取一条记录 91 public function getFormSource($query,$type="assoc"){ 92 if(!in_array($type,array("assoc","array","row"))) 93 { 94 die("mysqli_query error"); 95 } 96 $funcname="mysqli_fetch_".$type; 97 return $funcname($query); 98 } 99 //获取多条数据,二维数组 100 public function getAll($sql){ 101 $query=$this->query($sql); 102 $list=array(); 103 while ($r=$this->getFormSource($query)) { 104 $list[]=$r; 105 } 106 return $list; 107 } 108 109 public function selectAll($table,$where,$fields=‘*‘,$order=‘‘,$skip=0,$limit=1000) 110 { 111 if(is_array($where)){ 112 foreach ($where as $key => $val) { 113 if (is_numeric($val)) { 114 $condition = $key.‘=‘.$val; 115 }else{ 116 $condition = $key.‘="‘.$val.‘"‘; 117 } 118 } 119 } else { 120 $condition = $where; 121 } 122 if (!empty($order)) { 123 $order = " order by ".$order; 124 } 125 $sql = "select $fields from $table where $condition $order limit $skip,$limit"; 126 $query = $this->query($sql); 127 $list = array(); 128 while ($r= $this->getFormSource($query)) { 129 $list[] = $r; 130 } 131 return $list; 132 } 133 /** 134 * 定义添加数据的方法 135 * @param string $table 表名 136 * @param string orarray $data [数据] 137 * @return int 最新添加的id 138 */ 139 public function insert($table,$data){ 140 //遍历数组,得到每一个字段和字段的值 141 $key_str=‘‘; 142 $v_str=‘‘; 143 foreach($data as $key=>$v){ 144 // if(empty($v)){ 145 // die("error"); 146 // } 147 //$key的值是每一个字段s一个字段所对应的值 148 $key_str.=$key.‘,‘; 149 $v_str.="‘$v‘,"; 150 } 151 $key_str=trim($key_str,‘,‘); 152 $v_str=trim($v_str,‘,‘); 153 //判断数据是否为空 154 $sql="insert into $table ($key_str) values ($v_str)"; 155 $this->query($sql); 156 //返回上一次增加操做产生ID值 157 return $this->getInsertid(); 158 } 159 /* 160 * 删除一条数据方法 161 * @param1 $table, $where=array(‘id‘=>‘1‘) 表名 条件 162 * @return 受影响的行数 163 */ 164 public function deleteOne($table, $where){ 165 if(is_array($where)){ 166 foreach ($where as $key => $val) { 167 $condition = $key.‘=‘.$val; 168 } 169 } else { 170 $condition = $where; 171 } 172 $sql = "delete from $table where $condition"; 173 $this->query($sql); 174 //返回受影响的行数 175 return mysqli_affected_rows($this->link); 176 } 177 /* 178 * 删除多条数据方法 179 * @param1 $table, $where 表名 条件 180 * @return 受影响的行数 181 */ 182 public function deleteAll($table, $where){ 183 if(is_array($where)){ 184 foreach ($where as $key => $val) { 185 if(is_array($val)){ 186 $condition = $key.‘ in (‘.implode(‘,‘, $val) .‘)‘; 187 } else { 188 $condition = $key. ‘=‘ .$val; 189 } 190 } 191 } else { 192 $condition = $where; 193 } 194 $sql = "delete from $table where $condition"; 195 $this->query($sql); 196 //返回受影响的行数 197 return mysqli_affected_rows($this->link); 198 } 199 /** 200 * [修改操作description] 201 * @param [type] $table [表名] 202 * @param [type] $data [数据] 203 * @param [type] $where [条件] 204 * @return [type] 205 */ 206 public function update($table,$data,$where,$limit=0){ 207 //遍历数组,得到每一个字段和字段的值 208 $str=‘‘; 209 foreach($data as $key=>$v){ 210 $str.="$key=‘$v‘,"; 211 } 212 $str=rtrim($str,‘,‘); 213 if(is_array($where)){ 214 foreach ($where as $key => $val) { 215 if(is_array($val)){ 216 $condition = $key.‘ in (‘.implode(‘,‘, $val) .‘)‘; 217 } else { 218 $condition = $key. ‘=‘ .$val; 219 } 220 } 221 } else { 222 $condition = $where; 223 } 224 225 if (!empty($limit)) { 226 $limit = " limit ".$limit; 227 }else{ 228 $limit=‘‘; 229 } 230 //修改SQL语句 231 $sql="update $table set $str where $condition $limit"; 232 $this->query($sql); 233 //返回受影响的行数 234 return mysqli_affected_rows($this->link); 235 } 236 }
1 //Order.php 2 <?php 3 //这个文件是用来接受用户的订单信息并写入队列的一个文件 4 include ‘../db.php‘; 5 6 if(!empty($_GET[‘mobile‘])){ 7 //首先是订单中心的处理流程 8 //...且生成订单号 9 $order_id = rand(10000,99999); 10 //把用户传过来的数据进行过滤(可以防止sql注入) 11 //订单信息 12 $insert_data = array( 13 ‘order_id‘=>$order_id, 14 ‘mobile‘=>$_GET[‘mobile‘], 15 ‘created_at‘=>date(‘Y-m-d H:i:s‘, time()), 16 ‘status‘=>0 17 ) 18 //把数据存入队列表中 19 $db = DB::getIntance(); 20 $res = $db->insert(‘order_queue‘,$insert_data); 21 if($res){ 22 echo $insert_data[‘order_id‘].‘保存成功‘; 23 }else{ 24 echo $insert_data[‘order_id‘].‘保存失败‘; 25 } 26 }
1 //Goods.php 2 <?php 3 //配送系统处理队列中的订单并进行标记的文件 4 include ‘../include/db.php‘; 5 $db = DB::getIntance(); 6 //1.先把要处理的记录更新为等待处理 7 //配送系统不是及时完成的,存在处理时间,若在处理中时其他程序操作该记录会存在冲突,所以需要先将订单进行锁定 8 $waiting = array(‘status‘=>0); 9 $lock = array(‘status‘=>2); 10 $res_lock = update(‘order_queue‘,$lock,$waiting,2); 11 //2.选择出刚刚更新的数据,然后进行配送系统的处理 12 if($res_lock){ 13 //2.1选择出要处理的订单内容 14 $res = $db->selectAll(‘order_queue‘,$lock); 15 //2.2由配货系统进行退货处理 16 //... 17 18 //3.将处理过的程序更新为已完成 19 $success = array( 20 ‘status‘=>1, 21 ‘updated_at‘=>date(‘Y-m-d H:i:s‘,time()) 22 ); 23 $res_last = $db->update(‘order_queue‘,$success,$lock); 24 if($res_last){ 25 echo ‘Success‘.$res_last; 26 }else{ 27 echo ‘Fail‘.$res_last; 28 } 29 }else{ 30 echo ‘All Finished‘; 31 }
1 #good.sh 2 date "+%G-%m-%d %H:%M:S" 3 cd /home/项目位置 4 php goods.php
服务器部署定时任务使用 crontab -e
#m分 h时 dom日 mon月 dow周 command命令
*/1 * * * * shell脚本目录 >> 日志文件目录 2>&1(错误输出转化为标准输出)
tail -f log.log //监控日志文件
三.流量削峰案例:Redis的List类型实现秒杀
- 常用命令
- LPUSH/LPUSHX :将值插入到(/存在的)列表头部
- RPUSH/RPUSHX :将值插入到(/存在的)列表尾部
- LPOP:移出并获取列表的第一个元素
- RPOP :移出并获取列表的最后一个元素``
- LTRIM :保留指定区间内的元素
- LLEN :获取列表长度
- LSET:通过索引设置列表元素的值
- LINDEX :通过索引获取列表中的元素
- LRANGE:获取列表指定范围内的元素
- 架构设计: 秒杀业务程序--->redis--->通过入库程序将redis中数据入库永久保存
- 代码级设计
- 秒杀程序把请求写入Redis, (Uid, time_stamp微秒级时间戳)
- 检查Redis已存放数据的长度,超出上限直接丢弃。
- 死循环处理存入Redis的数据并入库
- 代码实现
1 -- redis_queue数据库表的SQL语句: 2 create table `redis_queue`( 3 `id` int(10) unsigned NOT NULL AUTO_INCREMENT, 4 `uid` int(11) NOT NULL DEFAULT ‘0‘, 5 `time_stamp` varchar(24) NOT NULL, 6 PRIMARY KEY(`id`) 7 ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
1 //user.php 2 <?php 3 //接收用户请求存入redis中 4 //首先加载redis组件 5 $redis = new Redis(); 6 $redis->connect(‘127.0.0.1‘,6379); 7 $redis_name = ‘miaosha‘; 8 ///////////////////////////////////////////////////////// 9 //接收用户id 10 $uid = $_GET[‘uid‘]; 11 //获取一下redis里已有的数量 12 $num = 10; 13 //如果当天人数少于十的时候,则加入这个队列 14 if ($redis->lLen($redis_name) < 10){ 15 $redis->rPush($redis_name,$uid.‘%‘.microtime()); 16 echo $uid.‘秒杀成功‘; 17 }else{ 18 //如果当天人数已达到十人 19 echo ‘秒杀已结束‘; 20 } 21 $redis->close(); 22 ///仅模拟高压下/////////////////////////////////////////////// 23 for($i=0;$i<100;$i++){ 24 $uid = rand(100000,999999); 25 //获取一下redis里已有的数量 26 $num = 10; 27 //如果当天人数少于十的时候,则加入这个队列 28 if ($redis->lLen($redis_name) < 10){ 29 $redis->rPush($redis_name,$uid.‘%‘.microtime()); 30 echo $uid.‘秒杀成功‘; 31 }else{ 32 //如果当天人数已达到十人 33 echo ‘秒杀已结束‘; 34 } 35 } 36 $redis->close(); 37 /////////////////////////////////////////////////////////
1 //savetodb.php 2 <?php 3 include ‘../include/db.php‘; 4 //首先加载redis组件 5 $redis = new Redis(); 6 $redis->connect(‘127.0.0.1‘,6379); 7 $redis_name = ‘miaosha‘; 8 9 $db = DB::getIntance(); 10 //死循环 11 while(1){ 12 //从队列最左侧取出一个值 13 $user = $redis->lPop($redis_name); 14 //然后判断这个值是否存在 15 if(!$user||$user == ‘nil‘){ 16 sleep(2); 17 continue; 18 } 19 //切割出时间,uid 20 $user_arr = explode(‘%‘,$user); 21 $insert_data = array( 22 ‘uid‘=>$user_arr[0], 23 ‘time_stamp‘=>$user_arr[1] 24 ) 25 //保存到数据库中 26 $res = $db->insert("redis_queue",$insert_data); 27 //数据库插入失败时回滚机制 28 if(!$res){ 29 $redis->rPush($redis_name,$user); 30 } 31 sleep(2); 32 } 33 //释放redis 34 $redis->close();
第4章 RabbitMQ:更专业的消息系统实现方案
- RabbitMo的架构和原理
- 特点:完整的实现了AMOP、集群简化、持久化、跨平台
- RabbitMQ使用
- RabbitMQ安装(rabbitmq-server、php-amaplib)
- 生产者向消息通道发送消息
- 消费者处理消息
第5章 总结
- 为什么使用队列
- 如何使用队列
- 队列实现机制
- 存取的使用情况
- 如何监控
以上是关于PHP消息队列实现及应用的主要内容,如果未能解决你的问题,请参考以下文章