异步redis队列实现 数据入库

Posted woshihaiyong168

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了异步redis队列实现 数据入库相关的知识,希望对你有一定的参考价值。


业务需求


app客户端向服务端接口发送来json 数据  每天 发一次   清空缓存后会再次发送


出问题之前业务逻辑:

php 接口 首先将 json 转为数组  去重  在一张大表中插入不存在的数据

该用户已经存在 和新增的id

入另一种详情表


问题所在:

当用户因特殊情况清除缓存  导致app 发送json串  入库并发高 导致CPU 暴增到88% 并且居高不下



优化思路:

1、异步队列处理

2、redis 过滤(就是只处理当天第一次请求)

3、redis 辅助存储app名称(验证过后批量插入数据app名称表中)

4、拼接插入的以及新增的如详细表中


解决办法:

1、接口修改  redis 过滤 + 如list队列   并将结果存入redis中

  首先 redis将之前的历史数据放在redis 哈希里面  中文为键名  id 为键值

<?php
/**
 * Created by haiyong.
 * User: jia
 * Date: 2017/9/18
 * Time: 20:06
 */
namespace App\\Http\\Controllers\\App;


use App\\Http\\Controllers\\Controller;
use Illuminate\\Http\\Request;
use Illuminate\\Support\\Facades\\DB;
use Illuminate\\Support\\Facades\\Redis;

class OtherAppController extends Controller

    /**
     * app应用统计接口
     * @param Request $request
     * @return string
     */
    public function appTotal(Request $request)
    
        //  //历史数据入库
        //$redis = Redis::connection('web_active');
        // $app_name = DB::connection('phpLog')->table('app_set_name')->where("appName", '<>', ' ')->lists('id', 'appName');
        // $str  = '';
        // foreach ($app_name as $key => $val) 
        //     $str.= "$val $key ";
        // 
        // $redis->hmset('app_name', $app_name);
        // echo $str;exit;
        $result = $request->input('res');
        $list = json_decode($result, true);
        if (empty ($list) || !is_array($list)) 
            return json_encode(['result' => 'ERROR', 'msg' => 'parameter error']);
        
        $data['uid'] = isset($list['uid']) ? $list['uid'] : '20001' ;
        $data['time'] = date('Y-m-d');
        $redis_key = 'log_app:'.$data['time'];
        //redis 过滤
        $redis = Redis::connection('web_active');
        //redis 键值过期设置
        if (empty($redis->exists($redis_key))) 
            $redis->hset($redis_key, 1, 'start');
            $redis->EXPIREAT($redis_key, strtotime($data['time'].'+2 day'));
        
        //值确定
        if ($redis->hexists($redis_key, $data['uid'])) 
            return json_encode(['result' => 'SUCCESS']);
         else 
            //推入队列
            $redis->hset($redis_key, $data['uid'], $result);
            $redis->rpush('log_app_list', $data['time'] . ':' . $data['uid']);
            return json_encode(['result' => 'SUCCESS']);
        
    





2、php 脚本循环 监控redis 队列 执行逻辑    防止内存溢出

mget 获取该用户的app id  不存在就会返回null

通过判断null  运用redis 新值作为自增id指针  将null 补齐  之后批量入mysql   并跟新redis 哈希 和指针值  并入库 详情表

<?php

namespace App\\Console\\Commands;

use Illuminate\\Console\\Command;
use Illuminate\\Support\\Facades\\Redis;
use Illuminate\\Support\\Facades\\DB;
use Illuminate\\Support\\Facades\\Storage;

class AppTotal extends Command

    /**
     * The name and signature of the console command.
     *
     * @var string
     */
    protected $signature = 'AppTotal:run';

    /**
     * The console command description.
     *
     * @var string
     */
    protected $description = 'Command description';

    /**
     * Create a new command instance.
     *
     * @return void
     */
    public function __construct()
    
        parent::__construct();
    

    /**
     * Execute the console command.
     *
     * @return mixed
     */
    public function handle()
    
    	    //历史数据入库  
        // $redis = Redis::connection('web_active');  
        // $app_name = DB::connection('phpLog')->table('app_set_name')->where("appName", '<>', ' ')->lists('id', 'appName');  
        // $redis->hmset('app_name', $app_name);  
        // exit;  
          while(1) 
            $redis = Redis::connection('web_active');
            //队列名称
            $res = $redis->lpop('log_app_list');   
            //开关按钮
            $lock = $redis->get('log_app_lock');
            if (!empty($res)) 
                list($date,$uid) = explode(':',$res);
                $result = $redis->hget('log_app:'.$date, $uid);
                if (!empty($result)) 
                        $table_name = 'app_total'.date('Ym');
                        $list = json_decode($result, true);
                        $data['uid'] = isset($list['uid']) ? $list['uid'] : '20001' ;
                        $data['sex'] = isset($list['sex']) ? $list['sex'] : '' ;
                        $data['device'] = isset($list['device']) ? $list['device'] : '' ;
                        $data['appList'] = isset($list['list']) ? $list['list'] : '' ;
                        //数据去重    flip比unique更节约性能
                        $data['appList'] = array_flip($data['appList']);
                        $data['appList'] = array_flip($data['appList']);
                        $data['time'] = date('Y-m-d');
                        //app应用过滤
                        $app_res = $redis->hmget('app_name', $data['appList']);
                        //新增加app数组
                        $new_app = [];
                        //mysql 入库数组
                        $mysql_new_app = [];
				        //获取当前redis 自增指针
				        $total = $redis->get('app_name_total');
				        foreach ($app_res as $key =>& $val) 
				            if (is_null($val)) 
				                $total += 1;
				                $new_app[$data['appList'][$key]] = $total; 
				                $val = $total;
				                array_push($mysql_new_app,['id' => $total, 'appName'=> $data['appList'][$key]]);
				            
				        
				        if (count($new_app))
				        	$str = "INSERT IGNORE INTO app_set_name (id,appName) values";
				        	foreach ($new_app as $key => $val) 
				        		$str.= "(".$val.",'".$key."'),";
				        	
				        	$str = trim($str, ',');
				        	//$mysql_res = DB::connection('phpLog')->table('app_set_name')->insert($mysql_new_app);
				        	$mysql_res = DB::connection('phpLog')->statement($str);
						        if ($mysql_res) 
						            // 设置redis 指针
						             $redis->set('app_name_total', $total);
						            //  redis 数据入库
						             $redis->hmset('app_name', $new_app);
						        
						 
			            //  详情数据入库
			             $data['appList'] = implode(',', $app_res);
                        //app统计入库
                        DB::connection('phpLog')->statement("INSERT IGNORE INTO ".$table_name." (uid,sex,device,`time`,appList) 
        values('".$data['uid']."',".$data['sex'].",'".$data['device']."','".$data['time']."','".$data['appList']."')");
                        //log 记录   当文件达到123MB的时候产生内存保错  所有这个地方可是利用日志切割 或者 不写入 日志
                        Storage::disk('local')->append(DIRECTORY_SEPARATOR.'total'.DIRECTORY_SEPARATOR.'loaAppTotal.txt', date('Y-m-d H:i:s').'  success '.$result."\\n"); 
				      else 
				       Storage::disk('local')->append(DIRECTORY_SEPARATOR.'total'.DIRECTORY_SEPARATOR.'loaAppTotal.txt', date('Y-m-d H:i:s').'  error '.$result."\\n");
				    
            
            //执行间隔
            sleep(1);
            //结束按钮
            if ($lock == 2) 
                exit;
            
            //内存检测
            if(memory_get_usage()>1000*1024*1024)
                exit('内存溢出');//大于100M内存退出程序,防止内存泄漏被系统杀死导致任务终端
            
        
    




3、执定 定时任务监控脚本执行情况

crontab -e

/2 * * * * /bin/bash /usr/local/nginx/html/test.sh 1>>/usr/local/nginx/html/log.log 2>&1


test.sh 内容  (查看执行命令返回的进程id  如果没有就执行命令开启)

#!/bin/bash
alive=`ps -ef | grep AppTotal | grep -v grep | awk 'print $2'`
if [ ! $alive ]
then
  /usr/local/php/bin/php   /var/ms/artisan AppTotal:run > /dev/null &
fi

记得授权哦   chmod +x test.sh


笔者用的laravel 框架   将命令激活丢入后台

执行命令

  /usr/local/php/bin/php   /var/ms/artisan AppTotal:run > /dev/null &

完事直接 ctrl -c 结束就行 命令以在后台运行  可以用shell 中的命令查看进程id


这样就实现队列异步入库

还有很多问题需要优化!!大致功能已经实现!!!!!!

优化完成后cpu



以上是关于异步redis队列实现 数据入库的主要内容,如果未能解决你的问题,请参考以下文章

Swoole来实现实时异步任务队列

redis 队列缓存 + mysql 批量入库 + php 离线整合

高并发简单解决方案————redis队列缓存+mysql 批量入库

通过Redis消息队列实现大文件处理

使用PHP+Redis实现微博的用户管理

redis缓存队列+MySQL +php任务脚本定时批量入库