PHP消息队列实现及应用

Posted qiusanqi

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了PHP消息队列实现及应用相关的知识,希望对你有一定的参考价值。

概要
  1. 消息队列的概念,原理和场景
  2. 解耦案例: 队列处理订单系统和配送系统
  3. 流量削峰案例: Redis的List类型实现秒杀
  4. RabbitMQ: 更专业的消息系统实现方案
 
一.消息队列
  1. 消息队列概念
    1. 队列结构中间件
    2. 消息放入后,不需要立即处理
    3. 由订阅者/消费者按顺序处理
  2. 核心结构
    1. 业务系统--入队-->消息队列--出队-->队列处理系统
  3. 应用场景
    1. 冗余: 数据需要冗余时,如订单系统后续需要严格的转换和记录,消息队列将这些数据持久化的存储在队列中,消息处理程序获取消息,并将该条记录删除
    2. 解耦: 解决了两套系统深度耦合问题,入队系统和出队系统无关
    3. 流量削峰: 秒杀和抢购,出现非常明显的流量剧增,大量的需求集中在几秒钟之内,对服务器的瞬间压力非常大
    4. 异步通信: 消息本身就可以使入队的请求直接返回,所以事先了程序的异步操作
    5. 扩展性: 如订单,订单入队后会有财务系统进行处理,后期如果添加配货系统,配货系统直接订阅该消息队列即可
    6. 排序保证: 有些场景下数据的处理顺序非常重要,队列可做成单线程单进单出系统,保证数据按照顺序处理
  4. 常见队列实现优缺点
    1. mysql: 可靠性高,易实现,速度慢
    2. Redis: 速度快,单条大消息包时效率低
    3. 消息系统: 专业性强,可靠,学习成本高
  5. 消息处理的触发机制
    1. 死循环方式读取: 易实现,故障时无法及时恢复(时效性强,适合秒杀)
    2. 定时任务: 压力均分,有处理量上限(定时任务的间隔和数量需要把控,适合订单系统,物流配货系统)
    3. 守护进程: 类似于php-FPM和PHP-CG,需要shell基础(监听进程检测消息队列中是否有内容,有内容启用出队系统进行处理)
 
二.解耦案例:队列处理订单系统和配送系统
  1. 架构设计
    1. 订单系统(接收用户订单)--->
    2. 订单队列表--->
    3. 配送系统中定时执行的程序读取队列表,将已处理的记录进行标记
  2. 程序流程
    1. 接收用户订单(Order.php)--->
    2. 订单系统处理--->
    3. 队列表(包含字符:order_id,status,mobile,address,created_at,updated_at)--->
    4. (定时脚本goods.sh每分钟启动)配送处理系统(Goods.php)--->
    5. 配送系统处理
  3. 代码实现
 1 -- 表结构
 2 create table `order_queue`(
 3 `id` int(11) unsigned NOT NULL AUTO_INCREMENT COMMENT id号,
 4 `order_id` int(11) NOT NULL,
 5 `mobile` varchar(20) NOT NULL COMMENT 用户手机号,
 6 `address` varchar(100) NOT NULL COMMENT 用户地址,
 7 `created_at` datetime NOT NULL DEFAULT 0000-00-00 00:00:00 COMMENT 订单创建的时间,
 8 `updated_at` datetime NOT NULL DEFAULT 0000-00-00 00:00:00 COMMENT 处理完成的时间,
 9 `status` tinyint(2) NOT NULL COMMENT 当前状态,0未处理,1已处理,2处理中,
10 PRIMARY key (`id`)
11 )ENGINE=InnoDB DEFAULT CHARSET=utf8;
  1 //db.php
  2 <?php
  3 // 数据库连接类
  4 class DB{
  5   //私有的属性
  6   private static $dbcon=false;
  7   private $host;
  8   private $port;
  9   private $user;
 10   private $pass;
 11   private $db;
 12   private $charset;
 13   private $link;
 14   //私有的构造方法
 15   private function __construct(){
 16     $this->host =  ‘localhost‘;
 17     $this->port =  ‘3306‘;
 18     $this->user =  ‘root‘;
 19     $this->pass =  ‘root‘;
 20     $this->db =  ‘imooc‘;
 21     $this->charset= ‘utf8‘;
 22     //连接数据库
 23     $this->db_connect();
 24     //选择数据库
 25     $this->db_usedb();
 26     //设置字符集
 27     $this->db_charset();
 28    }
 29    //连接数据库
 30    private function db_connect(){
 31     $this->link=mysqli_connect($this->host.‘:‘.$this->port,$this->user,$this->pass);
 32     if(!$this->link){
 33       echo "数据库连接失败<br>";
 34       echo "错误编码".mysqli_errno($this->link)."<br>";
 35       echo "错误信息".mysqli_error($this->link)."<br>";
 36       exit;
 37     }
 38    }
 39    //设置字符集
 40     private function db_charset(){
 41      mysqli_query($this->link,"set names {$this->charset}");
 42     }
 43     //选择数据库
 44    private function db_usedb(){
 45      mysqli_query($this->link,"use {$this->db}");
 46    }
 47    //私有的克隆(实现单例)
 48    private function __clone(){
 49      die(‘clone is not allowed‘);
 50    }
 51    //公用的静态方法(将该方法作为入口)
 52    public static function getIntance(){
 53      if(self::$dbcon==false){
 54       self::$dbcon=new self;
 55      }
 56      return self::$dbcon;
 57    }
 58    //执行sql语句的方法
 59     public function query($sql){
 60      $res=mysqli_query($this->link,$sql);
 61      if(!$res){
 62       echo "sql语句执行失败<br>";
 63       echo "错误编码是".mysqli_errno($this->link)."<br>";
 64       echo "错误信息是".mysqli_error($this->link)."<br>";
 65      }
 66      return $res;
 67    }
 68     //获得最后一条记录id
 69     public function getInsertid(){
 70      return mysqli_insert_id($this->link);
 71     }
 72    /**
 73     * 查询某个字段
 74     * @param
 75     * @return string or int
 76     */
 77     public function getOne($sql){
 78      $query=$this->query($sql);
 79       return mysqli_free_result($query);
 80     }
 81     //获取一行记录,return array 一维数组
 82     public function getRow($sql,$type="assoc"){
 83      $query=$this->query($sql);
 84      if(!in_array($type,array("assoc",‘array‘,"row"))){
 85        die("mysqli_query error");
 86      }
 87      $funcname="mysqli_fetch_".$type;
 88      return $funcname($query);
 89     }
 90     //获取一条记录,前置条件通过资源获取一条记录
 91     public function getFormSource($query,$type="assoc"){
 92     if(!in_array($type,array("assoc","array","row")))
 93     {
 94       die("mysqli_query error");
 95     }
 96     $funcname="mysqli_fetch_".$type;
 97     return $funcname($query);
 98     }
 99     //获取多条数据,二维数组
100     public function getAll($sql){
101      $query=$this->query($sql);
102      $list=array();
103      while ($r=$this->getFormSource($query)) {
104       $list[]=$r;
105      }
106      return $list;
107     }
108 
109     public function selectAll($table,$where,$fields=‘*‘,$order=‘‘,$skip=0,$limit=1000)
110     {
111               if(is_array($where)){
112                     foreach ($where as $key => $val) {
113                         if (is_numeric($val)) {
114                             $condition = $key.‘=‘.$val;
115                         }else{
116                             $condition = $key.‘="‘.$val.‘"‘;
117                         }
118                     }
119               } else {
120                 $condition = $where;
121               }
122               if (!empty($order)) {
123                   $order = " order by ".$order;
124               }
125               $sql = "select $fields from $table where $condition $order limit $skip,$limit";
126               $query = $this->query($sql);
127               $list = array();
128               while ($r= $this->getFormSource($query)) {
129                   $list[] = $r;
130               }
131               return $list;
132     }
133      /**
134      * 定义添加数据的方法
135      * @param string $table 表名
136      * @param string orarray $data [数据]
137      * @return int 最新添加的id
138      */
139      public function insert($table,$data){
140      //遍历数组,得到每一个字段和字段的值
141      $key_str=‘‘;
142      $v_str=‘‘;
143      foreach($data as $key=>$v){
144      //  if(empty($v)){
145      //   die("error");
146      // }
147         //$key的值是每一个字段s一个字段所对应的值
148         $key_str.=$key.‘,‘;
149         $v_str.="‘$v‘,";
150      }
151      $key_str=trim($key_str,‘,‘);
152      $v_str=trim($v_str,‘,‘);
153      //判断数据是否为空
154      $sql="insert into $table ($key_str) values ($v_str)";
155      $this->query($sql);
156     //返回上一次增加操做产生ID值
157      return $this->getInsertid();
158    }
159    /*
160     * 删除一条数据方法
161     * @param1 $table, $where=array(‘id‘=>‘1‘) 表名 条件
162     * @return 受影响的行数
163     */
164     public function deleteOne($table, $where){
165       if(is_array($where)){
166         foreach ($where as $key => $val) {
167           $condition = $key.‘=‘.$val;
168         }
169       } else {
170         $condition = $where;
171       }
172       $sql = "delete from $table where $condition";
173       $this->query($sql);
174       //返回受影响的行数
175       return mysqli_affected_rows($this->link);
176     }
177     /*
178     * 删除多条数据方法
179     * @param1 $table, $where 表名 条件
180     * @return 受影响的行数
181     */
182     public function deleteAll($table, $where){
183       if(is_array($where)){
184         foreach ($where as $key => $val) {
185           if(is_array($val)){
186             $condition = $key.‘ in (‘.implode(‘,‘, $val) .‘)‘;
187           } else {
188             $condition = $key. ‘=‘ .$val;
189           }
190         }
191       } else {
192         $condition = $where;
193       }
194       $sql = "delete from $table where $condition";
195       $this->query($sql);
196       //返回受影响的行数
197       return mysqli_affected_rows($this->link);
198     }
199    /**
200     * [修改操作description]
201     * @param [type] $table [表名]
202     * @param [type] $data [数据]
203     * @param [type] $where [条件]
204     * @return [type]
205     */
206    public function update($table,$data,$where,$limit=0){
207      //遍历数组,得到每一个字段和字段的值
208      $str=‘‘;
209     foreach($data as $key=>$v){
210      $str.="$key=‘$v‘,";
211     }
212     $str=rtrim($str,‘,‘);
213       if(is_array($where)){
214         foreach ($where as $key => $val) {
215           if(is_array($val)){
216             $condition = $key.‘ in (‘.implode(‘,‘, $val) .‘)‘;
217           } else {
218             $condition = $key. ‘=‘ .$val;
219           }
220         }
221       } else {
222         $condition = $where;
223       }
224 
225         if (!empty($limit)) {
226             $limit = " limit ".$limit;
227         }else{
228             $limit=‘‘;
229         }
230     //修改SQL语句
231     $sql="update $table set $str where $condition $limit";
232     $this->query($sql);
233     //返回受影响的行数
234     return mysqli_affected_rows($this->link);
235    }
236 }
 1 //Order.php
 2 <?php
 3 //这个文件是用来接受用户的订单信息并写入队列的一个文件
 4 include ‘../db.php‘;
 5 
 6 if(!empty($_GET[‘mobile‘])){
 7     //首先是订单中心的处理流程
 8     //...且生成订单号
 9     $order_id = rand(10000,99999);
10     //把用户传过来的数据进行过滤(可以防止sql注入)
11     //订单信息
12     $insert_data = array(
13         ‘order_id‘=>$order_id,
14         ‘mobile‘=>$_GET[‘mobile‘],
15         ‘created_at‘=>date(‘Y-m-d H:i:s‘, time()),
16         ‘status‘=>0
17     )
18     //把数据存入队列表中
19     $db = DB::getIntance();
20     $res = $db->insert(‘order_queue‘,$insert_data);
21     if($res){
22         echo $insert_data[‘order_id‘].‘保存成功‘;
23     }else{
24         echo $insert_data[‘order_id‘].‘保存失败‘;
25     }
26 }
 1 //Goods.php
 2 <?php
 3 //配送系统处理队列中的订单并进行标记的文件
 4 include ‘../include/db.php‘;
 5 $db = DB::getIntance();
 6 //1.先把要处理的记录更新为等待处理
 7 //配送系统不是及时完成的,存在处理时间,若在处理中时其他程序操作该记录会存在冲突,所以需要先将订单进行锁定
 8 $waiting = array(‘status‘=>0);
 9 $lock = array(‘status‘=>2);
10 $res_lock = update(‘order_queue‘,$lock,$waiting,2);
11 //2.选择出刚刚更新的数据,然后进行配送系统的处理
12 if($res_lock){
13     //2.1选择出要处理的订单内容
14     $res = $db->selectAll(‘order_queue‘,$lock);
15     //2.2由配货系统进行退货处理
16     //...
17     
18     //3.将处理过的程序更新为已完成
19     $success = array(
20         ‘status‘=>1,
21         ‘updated_at‘=>date(‘Y-m-d H:i:s‘,time())
22     );
23     $res_last = $db->update(‘order_queue‘,$success,$lock);
24     if($res_last){
25         echo ‘Success‘.$res_last;
26     }else{
27         echo ‘Fail‘.$res_last;
28     }
29 }else{
30     echo ‘All Finished‘;
31 }
1 #good.sh
2 date "+%G-%m-%d %H:%M:S"
3 cd /home/项目位置
4 php goods.php
服务器部署定时任务使用 crontab -e
#m分 h时 dom日 mon月 dow周 command命令
*/1 * * * * shell脚本目录 >> 日志文件目录 2>&1(错误输出转化为标准输出)
tail -f log.log //监控日志文件
 
三.流量削峰案例:Redis的List类型实现秒杀
  1. 常用命令
    1. LPUSH/LPUSHX :将值插入到(/存在的)列表头部
    2. RPUSH/RPUSHX :将值插入到(/存在的)列表尾部
    3. LPOP:移出并获取列表的第一个元素
    4. RPOP :移出并获取列表的最后一个元素``
    5. LTRIM :保留指定区间内的元素
    6. LLEN :获取列表长度
    7. LSET:通过索引设置列表元素的值
    8. LINDEX :通过索引获取列表中的元素
    9. LRANGE:获取列表指定范围内的元素
  2. 架构设计: 秒杀业务程序--->redis--->通过入库程序将redis中数据入库永久保存
  3. 代码级设计
    1. 秒杀程序把请求写入Redis, (Uid, time_stamp微秒级时间戳)
    2. 检查Redis已存放数据的长度,超出上限直接丢弃。
    3. 死循环处理存入Redis的数据并入库
  4. 代码实现
1 -- redis_queue数据库表的SQL语句:
2 create table `redis_queue`(
3 `id` int(10) unsigned NOT NULL AUTO_INCREMENT,
4 `uid` int(11) NOT NULL DEFAULT 0,
5 `time_stamp` varchar(24) NOT NULL,
6 PRIMARY KEY(`id`)
7 ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
 1 //user.php
 2 <?php
 3 //接收用户请求存入redis中
 4 //首先加载redis组件
 5 $redis = new Redis();
 6 $redis->connect(‘127.0.0.1‘,6379);
 7 $redis_name = ‘miaosha‘;
 8 /////////////////////////////////////////////////////////
 9 //接收用户id
10 $uid = $_GET[‘uid‘];
11 //获取一下redis里已有的数量
12 $num = 10;
13 //如果当天人数少于十的时候,则加入这个队列
14 if ($redis->lLen($redis_name) < 10){
15     $redis->rPush($redis_name,$uid.‘%‘.microtime());
16     echo $uid.‘秒杀成功‘;
17 }else{
18     //如果当天人数已达到十人
19     echo ‘秒杀已结束‘;    
20 }
21 $redis->close();
22 ///仅模拟高压下///////////////////////////////////////////////
23 for($i=0;$i<100;$i++){
24     $uid = rand(100000,999999);
25     //获取一下redis里已有的数量
26     $num = 10;
27     //如果当天人数少于十的时候,则加入这个队列
28     if ($redis->lLen($redis_name) < 10){
29         $redis->rPush($redis_name,$uid.‘%‘.microtime());
30         echo $uid.‘秒杀成功‘;
31     }else{
32         //如果当天人数已达到十人
33         echo ‘秒杀已结束‘;    
34     }
35 }
36 $redis->close();
37 /////////////////////////////////////////////////////////
 1 //savetodb.php
 2 <?php
 3 include ‘../include/db.php‘;
 4 //首先加载redis组件
 5 $redis = new Redis();
 6 $redis->connect(‘127.0.0.1‘,6379);
 7 $redis_name = ‘miaosha‘;
 8 
 9 $db = DB::getIntance();
10 //死循环
11 while(1){
12     //从队列最左侧取出一个值
13     $user = $redis->lPop($redis_name);
14     //然后判断这个值是否存在
15     if(!$user||$user == ‘nil‘){
16         sleep(2);
17         continue;
18     }
19     //切割出时间,uid
20     $user_arr = explode(‘%‘,$user);
21     $insert_data = array(
22         ‘uid‘=>$user_arr[0],
23         ‘time_stamp‘=>$user_arr[1]
24     )
25     //保存到数据库中
26     $res = $db->insert("redis_queue",$insert_data);
27     //数据库插入失败时回滚机制
28     if(!$res){
29         $redis->rPush($redis_name,$user);
30     }
31     sleep(2);
32 }
33 //释放redis
34 $redis->close();
 
第4章 RabbitMQ:更专业的消息系统实现方案
  1. RabbitMo的架构和原理
技术图片
    1. 特点:完整的实现了AMOP、集群简化、持久化、跨平台
  • RabbitMQ使用
    1. RabbitMQ安装(rabbitmq-server、php-amaplib)
    2. 生产者向消息通道发送消息
    3. 消费者处理消息
 
第5章 总结
  1. 为什么使用队列
  2. 如何使用队列
  3. 队列实现机制
  4. 存取的使用情况
  5. 如何监控
 
 
 
 
 
 
 
 
 
 
 
 


以上是关于PHP消息队列实现及应用的主要内容,如果未能解决你的问题,请参考以下文章

如何使用php、html及消息队列实现订单超时自动关闭订单

ActiveMQ消息队列的使用及应用

ActiveMQ消息队列的使用及应用

ActiveMQ消息队列的使用及应用

PHP + Redis 实现消息队列

消息队列mq的原理及实现方法